Kafka
Batch process all your records to store structured outputs in Kafka.
The requirements are as follows.
-
A Kafka cluster, such as ones offered by the following providers:
-
Confluent Cloud (Create a cluster.)
The following video shows how to set up a Kafka cluster in Confluent Cloud:
-
Amazon Managed Streaming for Apache Kafka (Amazon MSK) (Create an Amazon MSK Provisioned cluster, or create an Amazon MSK Serverless cluster.)
-
[Apache Kafka in Azure HDInsight] (Create an Apache Kafka cluster in Azure HDInsight.)
-
Google Cloud Managed Service for Apache Kafka (Create a cluster.)
-
-
The hostname and port number of the bootstrap Kafka cluster to connect to.
- For Confuent Cloud, get the hostname and port number.
- For Amazon MSK, get the hostname and port number.
- For Apache Kafka in Azure HDInsight, get the hostname and port number.
- For Google Cloud Managed Service for Apache Kafka, get the hostname and port number.
-
The name of the topic to read messages from and write messages to on the cluster.
- For Confluent Cloud, create a topic and access available topics.
- For Amazon MSK, create a topic on an Amazon MSK Provisioned cluster, or create a topic on an Amazon MSK Serverless cluster.
- For Apache Kafka in Azure HDInsight, create a topic and access available topics.
- For Google Cloud Managed Service for Apache Kafka, create a topic and access available topics.
-
If you use Kafka API keys and secrets for authentication, the key and secret values.
- For Confluent Cloud, create an API key and secret.
- For Amazon MSK, create an API key and secret.
- For Apache Kafka in Azure HDInsight, create an API key and secret.
- For Google Cloud Managed Service for Apache Kafka, create an API key and secret.
The Kafka connector dependencies:
You might also need to install additional dependencies, depending on your needs. Learn more.
The following environment variables:
KAFKA_BOOTSTRAP_SERVER
- The hostname of the bootstrap Kafka cluster to connect to, represented by--bootstrap-server
(CLI) orbootstrap_server
(Python).KAFKA_PORT
- The port number of the cluster, represented by--port
(CLI) orport
(Python).KAFKA_TOPIC
- The unique name of the topic to read messages from and write messages to on the cluster, represented by--topic
(CLI) ortopic
(Python).
If you use Kafka API keys and secrets for authentication:
KAFKA_API_KEY
- The Kafka API key value, represented by--kafka-api-key
(CLI) orkafka_api_key
(Python).KAFKA_SECRET
- The secret value for the Kafka API key, represented by--secret
(CLI) orsecret
(Python).
Additional settings include:
--confluent
(CLI) orconfluent
(Python): True to indicate that the cluster is running Confluent Kafka.--num-messages-to-consume
(CLI) ornum_messages_to_consume
(Python): The maximum number of messages to get from the topic. The default is1
if not otherwise specified.--timeout
(CLI) ortimeout
(Python): The maximum amount of time to wait for the response of a request to the topic, expressed in seconds. The default is1.0
if not otherwise specified.--group-id
(CLI) orgroup_id
(Python): The ID of the consumer group, if any, that is associated with the target Kafka cluster. (A consumer group is a way to allow a pool of consumers to divide the consumption of data over topics and partitions.) The default isdefault_group_id
if not otherwise specified.
These environment variables:
UNSTRUCTURED_API_KEY
- Your Unstructured API key value.UNSTRUCTURED_API_URL
- Your Unstructured API URL.
Now call the Unstructured CLI or Python SDK. The source connector can be any of the ones supported. This example uses the local source connector: