Kafka
Connect Kafka to your preprocessing pipeline, and use the Unstructured Ingest CLI or the Unstructured Ingest Python library to batch process all your documents and store structured outputs locally on your filesystem.
The requirements are as follows.
-
A Kafka cluster, such as ones offered by the following providers:
-
Confluent Cloud (Create a cluster.)
The following video shows how to set up a Kafka cluster in Confluent Cloud:
-
Amazon Managed Streaming for Apache Kafka (Amazon MSK) (Create an Amazon MSK Provisioned cluster, or create an Amazon MSK Serverless cluster.)
-
[Apache Kafka in Azure HDInsight] (Create an Apache Kafka cluster in Azure HDInsight.)
-
Google Cloud Managed Service for Apache Kafka (Create a cluster.)
-
-
The hostname and port number of the bootstrap Kafka cluster to connect to.
- For Confuent Cloud, get the hostname and port number.
- For Amazon MSK, get the hostname and port number.
- For Apache Kafka in Azure HDInsight, get the hostname and port number.
- For Google Cloud Managed Service for Apache Kafka, get the hostname and port number.
-
The name of the topic to read messages from and write messages to on the cluster.
- For Confluent Cloud, create a topic and access available topics.
- For Amazon MSK, create a topic on an Amazon MSK Provisioned cluster, or create a topic on an Amazon MSK Serverless cluster.
- For Apache Kafka in Azure HDInsight, create a topic and access available topics.
- For Google Cloud Managed Service for Apache Kafka, create a topic and access available topics.
-
If you use Kafka API keys and secrets for authentication, the key and secret values.
- For Confluent Cloud, create an API key and secret.
- For Amazon MSK, create an API key and secret.
- For Apache Kafka in Azure HDInsight, create an API key and secret.
- For Google Cloud Managed Service for Apache Kafka, create an API key and secret.
The Kafka connector dependencies:
You might also need to install additional dependencies, depending on your needs. Learn more.
The following environment variables:
KAFKA_BOOTSTRAP_SERVER
- The hostname of the bootstrap Kafka cluster to connect to, represented by--bootstrap-server
(CLI) orbootstrap_server
(Python).KAFKA_PORT
- The port number of the cluster, represented by--port
(CLI) orport
(Python).KAFKA_TOPIC
- The unique name of the topic to read messages from and write messages to on the cluster, represented by--topic
(CLI) ortopic
(Python).
If you use Kafka API keys and secrets for authentication:
KAFKA_API_KEY
- The Kafka API key value, represented by--kafka-api-key
(CLI) orkafka_api_key
(Python).KAFKA_SECRET
- The secret value for the Kafka API key, represented by--secret
(CLI) orsecret
(Python).
Additional settings include:
--confluent
(CLI) orconfluent
(Python): True to indicate that the cluster is running Confluent Kafka.--num-messages-to-consume
(CLI) ornum_messages_to_consume
(Python): The maximum number of messages to get from the topic. The default is1
if not otherwise specified.--timeout
(CLI) ortimeout
(Python): The maximum amount of time to wait for the response of a request to the topic, expressed in seconds. The default is1.0
if not otherwise specified.--group-id
(CLI) orgroup_id
(Python): The ID of the consumer group, if any, that is associated with the target Kafka cluster. (A consumer group is a way to allow a pool of consumers to divide the consumption of data over topics and partitions.) The default isdefault_group_id
if not otherwise specified.
Now call the Unstructured CLI or Python. The destination connector can be any of the ones supported. This example uses the local destination connector.
This example sends data to Unstructured API services for processing by default. To process data locally instead, see the instructions at the end of this page.
For the Unstructured Ingest CLI and the Unstructured Ingest Python library, you can use the --partition-by-api
option (CLI) or partition_by_api
(Python) parameter to specify where files are processed:
-
To do local file processing, omit
--partition-by-api
(CLI) orpartition_by_api
(Python), or explicitly specifypartition_by_api=False
(Python).Local file processing does not use an Unstructured API key or API URL, so you can also omit the following, if they appear:
--api-key $UNSTRUCTURED_API_KEY
(CLI) orapi_key=os.getenv("UNSTRUCTURED_API_KEY")
(Python)--partition-endpoint $UNSTRUCTURED_API_URL
(CLI) orpartition_endpoint=os.getenv("UNSTRUCTURED_API_URL")
(Python)- The environment variables
UNSTRUCTURED_API_KEY
andUNSTRUCTURED_API_URL
-
To send files to Unstructured API services for processing, specify
--partition-by-api
(CLI) orpartition_by_api=True
(Python).Unstructured API services also requires an Unstructured API key and API URL, by adding the following:
--api-key $UNSTRUCTURED_API_KEY
(CLI) orapi_key=os.getenv("UNSTRUCTURED_API_KEY")
(Python)--partition-endpoint $UNSTRUCTURED_API_URL
(CLI) orpartition_endpoint=os.getenv("UNSTRUCTURED_API_URL")
(Python)- The environment variables
UNSTRUCTURED_API_KEY
andUNSTRUCTURED_API_URL
, representing your API key and API URL, respectively.