Using Snowflake Connector for Kafka with Snowpipe Streaming

Realtime Data Ingestion

Sandun Dayananda
8 min readJun 6, 2024
Image by Author — kafka connector
Image by Author — kafka connector

In this article, I’m expecting do more practical stuff (If you need to understand Snowpipe Streaming theoretically >>read this article ). I’ll walk you through following things:

— Setup local Kafka cluster(on your machine)

— Configure Kafka Connector for Snowflake

— Create some basic Snowflake configurations

— Ingest sample data in realtime from kafka to Snowflake

At the end of this, you will have the ability to build your own pipelines!

Full code including all the configurations on my Github>> https://github.com/SandunDayananda/snowpipe-streaming

Why do we need Snowpipe Streaming over Traditional Snowpipe approach?

Snowpipe Streaming is a modern approach to data ingestion in Snowflake that offers several advantages over the traditional Snowpipe method. Here are a few reasons why it’s preferred:

  1. Real-Time Data Ingestion: Snowpipe Streaming allows for near real-time data ingestion, which means your data is always up-to-date. This is crucial for applications that require real-time analytics or immediate insights from the ingested data. Also, in the traditional snowpipe it has to keep data in an intermediate stage in order to do batch ingestion
  2. Automated Scaling: With Snowpipe Streaming, you don’t have to worry about the infrastructure or scaling issues. It automatically scales up and down based on the volume of incoming data, ensuring efficient resource utilization.
  3. Cost-Effective: Since Snowpipe Streaming only charges for the compute resources actually used, it can be more cost-effective than the traditional approach, especially for sporadic or unpredictable workloads.
  4. Simplified Data Pipeline: Snowpipe Streaming simplifies the data pipeline by eliminating the need for manual intervention in loading data. This reduces the complexity and maintenance overhead of your data pipelines.

Okay, Let’s Implement this.

Basically Snowpipe Streaming, a part of the Snowflake Ingest SDK, provides APIs that can be implemented in two primary ways:

  1. By creating a custom application using Java.
  2. By utilising the Snowflake Connector for Kafka.

Since we have a direct and more easy way with Snowflake Connector for Kafka we will use it for our work. This eliminates the need to develop a custom application as all the necessary Snowpipe Streaming API functions are already bundled within the connector(So, we just install the connector and play with it. That’s all!), enabling a quick start. Besides the Kafka connector, we’ll also require Kafka itself to establish and publish our topics, along with a runtime version of Java.

Step 1

In order to authenticate Kafka with Snowflake, a public key and private key are used. Open your terminal and run following commands to generate them.(This will create the key files in your current directory)

# generate private key first
openssl genrsa -out privatekey.pem 2048

# generate publickey
openssl rsa -in privatekey.pem -pubout -out publickey.crt

Then, open those key files and you will see the key in several lines. You need to make the key into a single line. As an example if your key is like

-----BEGIN PUBLIC KEY-----
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkey
yourkeyy
-----END PUBLIC KEY-----

after you make it into a single line it will looks like following(no need to keep — — -BEGIN PUBLIC KEY — — — and — — -END PUBLIC KEY — — — parts)

yourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyyourkeyy

Step 2

In this step, we will do Snowflake configurations. Before you do following configurations keep in mind to select a role that has privileges to create and manage roles(or run USE ROLE securityadmin;). Also, keep in mind to assign a warehouse for the schema.

Also, I recommend you to use the commands with ‘‘— **’’ in order to minimise the access issues.

-- Use a role that can create and manage roles and privileges.
-- USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE my_kafka_connector_role; --**

create database my_kafka_db; --**

-- create a warehouse
CREATE OR REPLACE WAREHOUSE my_kafka_warehouse WITH WAREHOUSE_SIZE='X-LARGE';

-- Grant privileges on the database.
GRANT USAGE ON DATABASE my_kafka_db TO ROLE my_kafka_connector_role;
-- or
GRANT ALL ON DATABASE my_kafka_db TO ROLE my_kafka_connector_role; --**

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA my_kafka_schema TO ROLE my_kafka_connector_role;
GRANT USAGE ON WAREHOUSE my_kafka_warehouse TO ROLE my_kafka_connector_role;
GRANT CREATE TABLE ON SCHEMA my_kafka_schema TO ROLE my_kafka_connector_role;
GRANT CREATE STAGE ON SCHEMA my_kafka_schema TO ROLE my_kafka_connector_role;
GRANT CREATE PIPE ON SCHEMA my_kafka_schema TO ROLE my_kafka_connector_role;

-- or
GRANT ALL ON SCHEMA my_kafka_schema TO ROLE my_kafka_connector_role; --**

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table TO ROLE my_kafka_connector_role;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage TO ROLE my_kafka_connector_role;

create user my_kafka_connector_user; --**

-- Grant the custom role to an existing user.
GRANT ROLE my_kafka_connector_role TO USER my_kafka_connector_user; --**

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER my_kafka_connector_user SET DEFAULT_ROLE = my_kafka_connector_role; --**

alter user my_kafka_connector_user set RSA_PUBLIC_KEY='put your public key here'; --**

Step 3

Now, we are going to setup Kafka in your local machine.

  1. Install Java SDK(jdk 11 works fine for me. You can test with later versions)
brew install openjdk@11
image by author — jdk installation
image by author — jdk installation

2. After you’ve installed OpenJDK, you’ll see a few informative messages. Specifically, you need to ensure that your system can locate OpenJDK. This is where the following commands come into play:

export PATH=”/usr/local/opt/openjdk@11/bin:$PATH”: This command is used to add the OpenJDK binary directory to your system’s PATH. The PATH is an environment variable that specifies a set of directories where executable programs are located. In this case, you’re adding the OpenJDK directory to your PATH so that you can run OpenJDK from any location on your system.

export CPPFLAGS=”-I/usr/local/opt/openjdk@11/include”: This command sets the CPPFLAGS environment variable to include the path to the OpenJDK header files. CPPFLAGS is used by the C preprocessor and C++ compiler to specify directories to search for header files. This is necessary for compiling programs that use OpenJDK.

These commands are configuring your system to find and use OpenJDK properly. They’re typically added to a shell startup file like .bashrc or .bash_profile to make the settings permanent.

So, run following two commands.(Perhaps you may have to run the commands suggested at the end of jdk installation in the terminal to get this work properly)

export PATH=”/usr/local/opt/openjdk@11/bin:$PATH”
export CPPFLAGS=”-I/usr/local/opt/openjdk@11/include”
image by author — set jdk paths
image by author — set jdk paths

Then open a new terminal or restart the current terminal. Also, keep in mind that above two commands are valid only for current shell session. If you want to make the changes permanent you can use following two commands instead of them. This means that these exports will be run every time you start a new shell, making the changes permanent.

#If you need to have openjdk@11 first in your PATH, run:
echo 'export PATH="/opt/homebrew/opt/openjdk@11/bin:$PATH"' >> ~/.zshrc

#For compilers to find openjdk@11 you may need to set:
export CPPFLAGS="-I/opt/homebrew/opt/openjdk@11/include"

3. Now, download and extract kafka

curl https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz --output kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz
image by author — download kafka and extract
image by author — download kafka and extract

4. Now go inside the kafka_2.13–3.3.1/libs folder. Then download Kafka connector for Snowflake

cd kafka_2.13-3.3.1/libs
curl https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.1/snowflake-kafka-connector-1.9.1.jar --output snowflake-kafka-connector-1.9.1.jar
image by author — kafka connector jar
image by author — kafka connector jar

5. Now we are going to configure kafka connector for snowflake. Since we are going to run this in standalone mode, we should create “SF_connect.properties” file in “kafka_2.13–3.3.1/config” folder. This file handles the connection from Kafka to Snowflake. Put following configurations in that file according to your needs.(In order to avoid connectivity issues, private key should be in a single line always)

name=my_kafka_snowpipe_streaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=1
topics=my_kafka_topic
snowflake.topic2table.map=topics=my_kafka_topic:topics=my_kafka_table
buffer.count.records=1
buffer.flush.time=10
buffer.size.bytes=20000000
snowflake.url.name=https://abcdef.snowflakecomputing.com:443
snowflake.user.name=kafka_connector_user_1
snowflake.private.key=myprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekeymyprivatekey
snowflake.database.name=my_kafka_db
snowflake.schema.name=my_kafka_schema
snowflake.role.name=my_kafka_connector_role
snowflake.warehouse.name=my_kafka_warehouse
snowflake.ingestion.method=SNOWPIPE_STREAMING
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Now we have configured everything. Next we are going to start the services.

Step 4

Do each of following in separate terminals.

  1. Start Zookeeper:

Open a terminal and run following.

cd ~/kafka_2.13-3.3.1

bin/zookeeper-server-start.sh config/zookeeper.properties

Then you will see the Zookeeper is getting started like following.

Image by author-start Zookeeper
Image by author-start Zookeeper

2. Start Kafka server:

Open an another terminal and run following. Then you will see something similar to the starting of the Zookeeper.

cd ~/kafka_2.13-3.3.1
bin/kafka-server-start.sh config/server.properties

3. Start Kafka Connector:

Open an another terminal and run following and then you will see something similar to the previous steps.(Here we are running Kafka Connect in Standalone mode since it is enough to test this. But in the production we have to run in distributed mode. In that case, we have to prepare the connector as a json file and submit as a POST API request)

bin/connect-standalone.sh ./config/connect-standalone.properties ./config/SF_connect.properties
  • Standalone Mode in Kafka Connect is a single process system that executes all connectors and tasks. It’s simple to initiate as the configuration is included with the process. It’s beneficial for development and testing, especially when creating a Kafka Connector. However, it lacks fault tolerance and doesn’t support horizontal scaling. Monitoring is also challenging due to it being a solitary process.
  • Distributed Mode, on the other hand, operates connectors and tasks across multiple workers (servers). The configuration is submitted via a REST API, not included with the workers. It’s easy to scale up by adding more workers. The system is fault tolerant, redistributing tasks among remaining workers if a worker fails. This mode provides both fault tolerance and horizontal scalability, making it effective for production deployment of connectors.

Step 5

Now everything is done! We are going to publish some messages into the topic and see how they will be ingested into the snowflake table in realtime.

Open an another terminal and run following command.

bin/kafka-console-producer.sh --topic my_kafka_topic --bootstrap-server localhost:9092 

The kafka-console-producer.sh is a command-line tool provided by Apache Kafka. It’s used to send messages to a Kafka topic from the command line. You can publis some messages to your topic like following:

Image by author — kafka producer
Image by author — kafka producer

If the connection is established correctly, Now you will see the published message in your snowflake table.

Image by author — ingested record
Image by author — ingested record

Also, you can see all the messages in your topic using following command:

bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning
Image by author — Kafka consumer
Image by author — Kafka consumer

If you need more information, you can go through this official Snowflake documentation https://docs.snowflake.com/en/user-guide/kafka-connector-install#label-kafka-connector-configuration-file

Follow me on Medium for more practical content like this!

--

--

Sandun Dayananda

Big Data Engineer with passion for Machine Learning and DevOps | MSc Industrial Analytics at Uppsala University