Apache Flink UDFs
Kryptonite for Kafka provides multiple Flink user-defined functions (UDFs) for encrypting and decrypting column values in Flink Table API and Flink SQL jobs.
Field-Level Encryption with UDFs in Flink SQL
Field-Level Decryption with UDFs in Flink SQL
UDFs Overview
Probabilistic / Deterministic Encryption
| UDF | Description |
|---|---|
k4k_encrypt |
Encrypt scalar or complex values |
k4k_decrypt_with_schema |
Decrypt scalar or complex values using a schema string literal for the expected target type |
k4k_encrypt_array |
Encrypt ARRAY elements individually |
k4k_decrypt_array_with_schema |
Decrypt ARRAY elements individually using a schema string literal for the expected target type |
k4k_encrypt_map |
Encrypt MAP values individually |
k4k_decrypt_map_with_schema |
Decrypt MAP values individually using a schema string literal for the expected target type |
k4k_encrypt_row |
Encrypt ROW fields individually |
k4k_decrypt_row_with_schema |
Decrypt ROW fields individually using a schema string literal for the expected target type |
k4k_decrypt (deprecated) |
Decrypt scalar or complex values using an exemplary type capture value for type inference - please use k4k_decrypt_with_schema instead! |
k4k_decrypt_array (deprecated) |
Decrypt ARRAY elements using an exemplary type capture value for type inference - please use k4k_decrypt_array_with_schema instead! |
k4k_decrypt_map (deprecated) |
Decrypt MAP values using an exemplary type capture value for type inference - please use k4k_decrypt_map_with_schema instead! |
Format-Preserving Encryption
| UDF | Description |
|---|---|
k4k_encrypt_fpe |
FPE encrypt scalar values |
k4k_decrypt_fpe |
FPE decrypt scalar values |
k4k_encrypt_array_fpe |
FPE encrypt array elements |
k4k_decrypt_array_fpe |
FPE decrypt array elements |
k4k_encrypt_map_fpe |
FPE encrypt map values |
k4k_decrypt_map_fpe |
FPE decrypt map values |
Installation / Deployment
- Download the UDF JAR from the GitHub Releases page.
- Place it in the Flink libraries directory scanned during cluster startup.
- Register the functions in your Flink catalog before use.
Function registration
Run this either in a Flink SQL session or as part of your SQL client initialisation:
ADD JAR '<FULL_PATH_TO_FLINK_UDFS_KRYPTONITE_JAR>';
CREATE FUNCTION k4k_encrypt AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_with_schema AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptWithSchemaUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_array AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptArrayUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_array_with_schema AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptArrayWithSchemaUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_map AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptMapUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_map_with_schema AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptMapWithSchemaUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_row AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptRowUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_row_with_schema AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptRowWithSchemaUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptFpeUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptFpeUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_array_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptArrayFpeUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_array_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptArrayFpeUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_encrypt_map_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.EncryptMapFpeUdf' LANGUAGE JAVA;
CREATE FUNCTION k4k_decrypt_map_fpe AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptMapFpeUdf' LANGUAGE JAVA;
-- NOTE: The following functions have been deprecated. Please use the corresponding
-- "*_with_schema" function alternatives instead. Registration and usage of these functions
-- for new projects is highly discouraged to due upcoming removal!
-- CREATE FUNCTION k4k_decrypt AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptUdf' LANGUAGE JAVA;
-- CREATE FUNCTION k4k_decrypt_array AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptArrayUdf' LANGUAGE JAVA;
-- CREATE FUNCTION k4k_decrypt_map AS 'com.github.hpgrahsl.flink.functions.kryptonite.DecryptMapUdf' LANGUAGE JAVA;
Verify:
SHOW USER FUNCTIONS;
-- Expected to list all successfully registered k4k_* functions (and potentially other registered user functions as well)
Configuration
Any UDF-specific configuration is passed as environment variables to the Flink TaskManagers (or alternatively in flink-conf.yaml):
environment:
- cipher_data_keys=[{"identifier":"my-key","material":{"primaryKeyId":1234567890,"key":[{"keyData":{"typeUrl":"type.googleapis.com/google.crypto.tink.AesGcmKey","value":"<BASE64_KEY>","keyMaterialType":"SYMMETRIC"},"status":"ENABLED","keyId":1234567890,"outputPrefixType":"TINK"}]}}]
- cipher_data_key_identifier=my-key
For KMS-backed or encrypted keysets, additionally set key_source, kms_type, kms_config, kek_type, kek_uri, kek_config. See the Configuration Reference for further details.
UDF Signatures
k4k_encrypt / k4k_decrypt
-- Encrypt with configured defaults
K4K_ENCRYPT(data T) → VARCHAR
-- Encrypt with explicit key identifier and algorithm
K4K_ENCRYPT(data T, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → VARCHAR
-- Decrypt (typeCapture is an exemplary value used for type inference)
K4K_DECRYPT(data VARCHAR, typeCapture T) → T
k4k_decrypt_with_schema
An alternative to k4k_decrypt that takes a Flink SQL type string literal instead of a type capture value. The schema string must be a compile-time string literal — column references or runtime expressions are not accepted.
-- schemaString examples: 'STRING', 'INT', 'BIGINT', 'FLOAT', 'DOUBLE', 'BOOLEAN', 'BYTES'
K4K_DECRYPT_WITH_SCHEMA(data VARCHAR, schemaString VARCHAR) → T
k4k_encrypt_array / k4k_decrypt_array
K4K_ENCRYPT_ARRAY(data ARRAY<T>) → ARRAY<VARCHAR>
K4K_ENCRYPT_ARRAY(data ARRAY<T>, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → ARRAY<VARCHAR>
-- typeCapture is a single element exemplary value used for type inference
K4K_DECRYPT_ARRAY(data ARRAY<VARCHAR>, typeCapture T) → ARRAY<T>
k4k_decrypt_array_with_schema
An alternative to k4k_decrypt_array that takes a Flink SQL type string literal instead of a type capture value. The schema string must start with ARRAY<...> and must be a compile-time literal.
-- schemaString examples: 'ARRAY<STRING>', 'ARRAY<INT>', 'ARRAY<BIGINT>', 'ARRAY<DOUBLE>', 'ARRAY<BOOLEAN>'
K4K_DECRYPT_ARRAY_WITH_SCHEMA(data ARRAY<VARCHAR>, schemaString VARCHAR) → ARRAY<T>
k4k_encrypt_map / k4k_decrypt_map
K4K_ENCRYPT_MAP(data MAP<K,V>) → MAP<K,VARCHAR>
K4K_ENCRYPT_MAP(data MAP<K,V>, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → MAP<K,VARCHAR>
-- typeCapture is a single exemplary map value used for type inference
K4K_DECRYPT_MAP(data MAP<K,VARCHAR>, typeCapture V) → MAP<K,V>
k4k_decrypt_map_with_schema
An alternative to k4k_decrypt_map that takes a Flink SQL type string literal instead of a type capture value. The schema string must start with MAP<...> and must be a compile-time literal.
-- schemaString examples: 'MAP<STRING, STRING>', 'MAP<STRING, INT>', 'MAP<INT, STRING>', 'MAP<STRING, BOOLEAN>'
K4K_DECRYPT_MAP_WITH_SCHEMA(data MAP<K,VARCHAR>, schemaString VARCHAR) → MAP<K,V>
k4k_encrypt_row
Encrypts ROW fields individually. Encrypted fields are returned as VARCHAR. Fields not in fieldList retain their original types. The fieldList parameter is a comma-separated list of field names to encrypt. Omitting it encrypts all fields. The fieldList parameter must be a string literal.
-- Encrypt all fields with configured defaults
K4K_ENCRYPT_ROW(data ROW<...>) → ROW<...(all fields as VARCHAR)...>
-- Encrypt only specific fields with configured defaults
K4K_ENCRYPT_ROW(data ROW<...>, fieldList VARCHAR) → ROW<...>
-- Encrypt all fields with explicit key identifier and algorithm
K4K_ENCRYPT_ROW(data ROW<...>, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → ROW<...(all fields as VARCHAR)...>
-- Encrypt only specific fields with explicit key identifier and algorithm
K4K_ENCRYPT_ROW(data ROW<...>, fieldList VARCHAR, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → ROW<...>
k4k_decrypt_row_with_schema
Decrypts ROW fields using a schema string literal that describes the target row type after decryption. The schema string must start with ROW<...> or ROW(...) and must be a compile-time literal. The optional fieldList parameter (comma-separated field names) limits decryption to specific fields; fields not in the list are passed through unchanged.
-- Decrypt all fields
K4K_DECRYPT_ROW_WITH_SCHEMA(data ROW<...(all VARCHAR)...>, schemaString VARCHAR) → ROW<...>
-- Decrypt only specific fields (others are passed through as-is)
K4K_DECRYPT_ROW_WITH_SCHEMA(data ROW<...>, schemaString VARCHAR, fieldList VARCHAR) → ROW<...>
Note
The schema string parameter for all decryption functions K4K_decrypt_*_with_schema MUST always be a string literal. Column references or runtime expressions cannot be supported.
FPE variants
K4K_ENCRYPT_FPE(data VARCHAR) → VARCHAR
K4K_ENCRYPT_FPE(data VARCHAR, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR) → VARCHAR
K4K_ENCRYPT_FPE(data VARCHAR, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR, fpeTweak VARCHAR) → VARCHAR
K4K_ENCRYPT_FPE(data VARCHAR, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR, fpeTweak VARCHAR, fpeAlphabetType VARCHAR) → VARCHAR
K4K_ENCRYPT_FPE(data VARCHAR, keyIdentifier VARCHAR, cipherAlgorithm VARCHAR, fpeTweak VARCHAR, fpeAlphabetType VARCHAR, fpeAlphabetCustom VARCHAR) → VARCHAR
K4K_DECRYPT_FPE and the *_ARRAY_FPE / *_MAP_FPE variants follow the same pattern.
Examples
Object mode encryption and decryption
-- Table to store encrypted records (all columns as VARCHAR in object mode)
CREATE TABLE my_data_encrypted_o (
id VARCHAR,
mystring VARCHAR,
myint VARCHAR,
myboolean VARCHAR,
mysubdoc1 VARCHAR,
myarray VARCHAR,
mysubdoc2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'my_data_encrypted_o',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Encrypt on insert
INSERT INTO my_data_encrypted_o VALUES (
K4K_ENCRYPT('1234567890'),
K4K_ENCRYPT('some foo text'),
K4K_ENCRYPT(42),
K4K_ENCRYPT(true),
K4K_ENCRYPT(ROW('As I was going to St. Ives', 1234)),
K4K_ENCRYPT(ARRAY['str_1', 'str_2', 'str_3']),
K4K_ENCRYPT(MAP['k1', 9, 'k2', 8, 'k3', 7])
);
-- Decrypt using schema strings
SELECT
K4K_DECRYPT_WITH_SCHEMA(id, 'STRING') AS id,
K4K_DECRYPT_WITH_SCHEMA(mystring, 'STRING') AS mystring,
K4K_DECRYPT_WITH_SCHEMA(myint, 'INT') AS myint,
K4K_DECRYPT_WITH_SCHEMA(myboolean, 'BOOLEAN') AS myboolean,
K4K_DECRYPT_WITH_SCHEMA(mysubdoc1, 'ROW<somestring STRING, someint INT>') AS mysubdoc1,
K4K_DECRYPT_WITH_SCHEMA(myarray, 'ARRAY<STRING>') AS myarray,
K4K_DECRYPT_WITH_SCHEMA(mysubdoc2, 'MAP<STRING, INT>') AS mysubdoc2
FROM my_data_encrypted_o LIMIT 1;
Element mode encryption and decryption
-- Table with element-wise encrypted complex types
CREATE TABLE my_data_encrypted_e (
id VARCHAR,
mystring VARCHAR,
myint VARCHAR,
myboolean VARCHAR,
mysubdoc1 ROW<somestring VARCHAR, someint VARCHAR>,
myarray ARRAY<VARCHAR>,
mysubdoc2 MAP<VARCHAR, VARCHAR>
) WITH (
'connector' = 'kafka',
'topic' = 'my_data_encrypted_e',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- Encrypt using k4k_encrypt_row,k4k_encrypt_array,k4k_encrypt_map for complex types
INSERT INTO my_data_encrypted_e VALUES (
K4K_ENCRYPT('1234567890'),
K4K_ENCRYPT('some foo text'),
K4K_ENCRYPT(42),
K4K_ENCRYPT(true),
K4K_ENCRYPT_ROW(ROW('As I was going to St. Ives', 1234)),
K4K_ENCRYPT_ARRAY(ARRAY['str_1', 'str_2', 'str_3']),
K4K_ENCRYPT_MAP(MAP['k1', 9, 'k2', 8, 'k3', 7])
);
-- Decrypt using schema strings
SELECT
K4K_DECRYPT_WITH_SCHEMA(id, 'STRING') AS id,
K4K_DECRYPT_WITH_SCHEMA(mystring, 'STRING') AS mystring,
K4K_DECRYPT_WITH_SCHEMA(myint, 'INT') AS myint,
K4K_DECRYPT_WITH_SCHEMA(myboolean, 'BOOLEAN') AS myboolean,
K4K_DECRYPT_ROW_WITH_SCHEMA(mysubdoc1, 'ROW<somestring STRING, someint INT>') AS mysubdoc1,
K4K_DECRYPT_ARRAY_WITH_SCHEMA(myarray, 'ARRAY<STRING>') AS myarray,
K4K_DECRYPT_MAP_WITH_SCHEMA(mysubdoc2, 'MAP<STRING, INT>') AS mysubdoc2
FROM my_data_encrypted_e LIMIT 1;
Partial ROW field encryption and decryption
k4k_encrypt_row and k4k_decrypt_row_with_schema both accept an optional fieldList parameter (comma-separated field names) to process only a subset of fields. Fields not listed are passed through unchanged.
-- Encrypt only 'name'; 'age' and 'active' remain plaintext
INSERT INTO my_people_encrypted
SELECT K4K_ENCRYPT_ROW(myrow, 'name') AS myrow
FROM my_people_source;
-- Decrypt only 'name'; 'age' and 'active' pass through as-is
SELECT
K4K_DECRYPT_ROW_WITH_SCHEMA(myrow, 'ROW<name STRING, age INT, active BOOLEAN>', 'name') AS myrow
FROM my_people_encrypted LIMIT 1;
-- Encrypt and decrypt multiple specific fields
INSERT INTO my_records_encrypted
SELECT K4K_ENCRYPT_ROW(myrow, 'id,score') AS myrow
FROM my_records_source;
SELECT
K4K_DECRYPT_ROW_WITH_SCHEMA(myrow, 'ROW<id STRING, count INT, score DOUBLE, enabled BOOLEAN>', 'id,score') AS myrow
FROM my_records_encrypted LIMIT 1;
FPE encryption
INSERT INTO customer_fpe_encrypted
SELECT
customer_id,
K4K_ENCRYPT_FPE(credit_card_number, 'myFpeKey', 'CUSTOM/MYSTO_FPE_FF3_1', 'tweakAB', 'DIGITS') AS credit_card_number,
K4K_ENCRYPT_FPE(ssn, 'myFpeKey', 'CUSTOM/MYSTO_FPE_FF3_1', 'tweakXY', 'DIGITS') AS ssn
FROM customer_plaintext;
Complex Data Type Mapping
| Original type | Object mode (k4k_encrypt) |
Element mode (k4k_encrypt_array, k4k_encrypt_map, k4k_encrypt_row) |
|---|---|---|
ARRAY<T> |
VARCHAR |
ARRAY<VARCHAR> using k4k_encrypt_array |
MAP<K,V> |
VARCHAR |
MAP<K,VARCHAR> using k4k_encrypt_map |
ROW<...> |
VARCHAR |
ROW<...(all fields VARCHAR)...> using k4k_encrypt_row |

