The tap-kafka
extractor
pulls data from
Apache Kafka that can
then be sent to a destination using a
loader.
Airbyte Usage Notice
Container-based connectors
can introduce deployment challenges including the potential need to run
Docker-in-Docker (not currently supported by services like AWS ECS, Meltano
Cloud, etc. see
FAQ
and
Airbyte's ECS deployment docs
for more details). Before using this variant we recommend considering if/how
you will be able to deploy container-based connectors to production.
For more context on how this Airbyte integration works please checkout out the
FAQ in the Meltano Docs.
Alternate Implementations
- Airbyte🥈
- Gadget 🥈
- isabella232
- Peter Kosztolanyi
- Peter Begle
- Stitch Data 🥉
- Steve Hanna
- Wise (default)🥈
Getting Started
Prerequisites
If you haven't already, follow the initial steps of the Getting Started guide:
Installation and configuration
-
Add the tap-kafka extractor to your
project using
:meltano add
-
Configure the tap-kafka
settings using
:meltano config
-
Test that extractor settings are valid using
:meltano config
meltano add extractor tap-kafka --variant airbyte
meltano config tap-kafka set --interactive
meltano config tap-kafka test
Next steps
Follow the remaining steps of the Getting Started guide:
If you run into any issues, learn how to get help.
Capabilities
The current capabilities for
tap-kafka
may have been automatically set when originally added to the Hub. Please review the
capabilities when using this extractor. If you find they are out of date, please
consider updating them by making a pull request to the YAML file that defines the
capabilities for this extractor.
This plugin has the following capabilities:
- about
- catalog
- discover
- schema-flattening
- state
- stream-maps
You can
override these capabilities or specify additional ones
in your meltano.yml
by adding the capabilities
key.
Settings
The
tap-kafka
settings that are known to Meltano are documented below. To quickly
find the setting you're looking for, click on any setting name from the list:
airbyte_config.MessageFormat.deserialization_strategy
airbyte_config.MessageFormat.deserialization_type
airbyte_config.MessageFormat.schema_registry_password
airbyte_config.MessageFormat.schema_registry_url
airbyte_config.MessageFormat.schema_registry_username
airbyte_config.auto_commit_interval_ms
airbyte_config.auto_offset_reset
airbyte_config.bootstrap_servers
airbyte_config.client_dns_lookup
airbyte_config.client_id
airbyte_config.enable_auto_commit
airbyte_config.group_id
airbyte_config.max_poll_records
airbyte_config.max_records_process
airbyte_config.polling_time
airbyte_config.protocol.sasl_jaas_config
airbyte_config.protocol.sasl_mechanism
airbyte_config.protocol.security_protocol
airbyte_config.receive_buffer_bytes
airbyte_config.repeated_calls
airbyte_config.request_timeout_ms
airbyte_config.retry_backoff_ms
airbyte_config.subscription.subscription_type
airbyte_config.subscription.topic_partitions
airbyte_config.subscription.topic_pattern
airbyte_config.test_topic
airbyte_spec.image
airbyte_spec.tag
docker_mounts
Expand To Show SDK Settings
You can also list these settings using
with the meltano config
list
subcommand:
meltano config tap-kafka list
You can
override these settings or specify additional ones
in your meltano.yml
by adding the settings
key.
Please consider adding any settings you have defined locally to this definition on MeltanoHub by making a pull request to the YAML file that defines the settings for this plugin.
Airbyte Config Messageformat Deserialization Strategy (airbyte_config.MessageFormat.deserialization_strategy)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_DESERIALIZATION_STRATEGY
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config MessageFormat.deserialization_strategy [value]
Airbyte Config Messageformat Deserialization Type (airbyte_config.MessageFormat.deserialization_type)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_DESERIALIZATION_TYPE
JSON, AVRO
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config MessageFormat.deserialization_type [value]
Airbyte Config Messageformat Schema Registry Password (airbyte_config.MessageFormat.schema_registry_password)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_PASSWORD
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_password [value]
Airbyte Config Messageformat Schema Registry Url (airbyte_config.MessageFormat.schema_registry_url)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_URL
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_url [value]
Airbyte Config Messageformat Schema Registry Username (airbyte_config.MessageFormat.schema_registry_username)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MESSAGEFORMAT_SCHEMA_REGISTRY_USERNAME
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config MessageFormat.schema_registry_username [value]
Airbyte Config Auto Commit Interval Ms (airbyte_config.auto_commit_interval_ms)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_AUTO_COMMIT_INTERVAL_MS
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config auto_commit_interval_ms [value]
Airbyte Config Auto Offset Reset (airbyte_config.auto_offset_reset)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_AUTO_OFFSET_RESET
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server - earliest: automatically reset the offset to the earliest offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset is found for the consumer's group, anything else: throw exception to the consumer.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config auto_offset_reset [value]
Airbyte Config Bootstrap Servers (airbyte_config.bootstrap_servers)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_BOOTSTRAP_SERVERS
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,...
. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config bootstrap_servers [value]
Airbyte Config Client Dns Lookup (airbyte_config.client_dns_lookup)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_CLIENT_DNS_LOOKUP
Controls how the client uses DNS lookups. If set to use_all_dns_ips, connect to each returned IP address in sequence until a successful connection is established. After a disconnection, the next IP is used. Once all IPs have been used once, the client resolves the IP(s) from the hostname again. If set to resolve_canonical_bootstrap_servers_only, resolve each bootstrap address into a list of canonical names. After the bootstrap phase, this behaves the same as use_all_dns_ips. If set to default (deprecated), attempt to connect to the first IP address returned by the lookup, even if the lookup returns multiple IP addresses.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config client_dns_lookup [value]
Airbyte Config Client Id (airbyte_config.client_id)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_CLIENT_ID
An ID string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config client_id [value]
Airbyte Config Enable Auto Commit (airbyte_config.enable_auto_commit)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_ENABLE_AUTO_COMMIT
If true, the consumer's offset will be periodically committed in the background.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config enable_auto_commit [value]
Airbyte Config Group Id (airbyte_config.group_id)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_GROUP_ID
The Group ID is how you distinguish different consumer groups.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config group_id [value]
Airbyte Config Max Poll Records (airbyte_config.max_poll_records)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MAX_POLL_RECORDS
The maximum number of records returned in a single call to poll(). Note, that max_poll_records does not impact the underlying fetching behavior. The consumer will cache the records from each fetch request and returns them incrementally from each poll.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config max_poll_records [value]
Airbyte Config Max Records Process (airbyte_config.max_records_process)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_MAX_RECORDS_PROCESS
The Maximum to be processed per execution
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config max_records_process [value]
Airbyte Config Polling Time (airbyte_config.polling_time)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_POLLING_TIME
Amount of time Kafka connector should try to poll for messages.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config polling_time [value]
Airbyte Config Protocol Sasl Jaas Config (airbyte_config.protocol.sasl_jaas_config)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SASL_JAAS_CONFIG
The JAAS login context parameters for SASL connections in the format used by JAAS configuration files., The JAAS login context parameters for SASL connections in the format used by JAAS configuration files.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config protocol.sasl_jaas_config [value]
Airbyte Config Protocol Sasl Mechanism (airbyte_config.protocol.sasl_mechanism)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SASL_MECHANISM
PLAIN, The SASL mechanism used for client connections. This may be any mechanism for which a security provider is available.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config protocol.sasl_mechanism [value]
Airbyte Config Protocol Security Protocol (airbyte_config.protocol.security_protocol)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_PROTOCOL_SECURITY_PROTOCOL
PLAINTEXT, SASL_PLAINTEXT, SASL_SSL
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config protocol.security_protocol [value]
Airbyte Config Receive Buffer Bytes (airbyte_config.receive_buffer_bytes)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_RECEIVE_BUFFER_BYTES
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config receive_buffer_bytes [value]
Airbyte Config Repeated Calls (airbyte_config.repeated_calls)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_REPEATED_CALLS
The number of repeated calls to poll() if no messages were received.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config repeated_calls [value]
Airbyte Config Request Timeout Ms (airbyte_config.request_timeout_ms)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_REQUEST_TIMEOUT_MS
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config request_timeout_ms [value]
Airbyte Config Retry Backoff Ms (airbyte_config.retry_backoff_ms)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_RETRY_BACKOFF_MS
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config retry_backoff_ms [value]
Airbyte Config Subscription Subscription Type (airbyte_config.subscription.subscription_type)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_SUBSCRIPTION_TYPE
assign, subscribe
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config subscription.subscription_type [value]
Airbyte Config Subscription Topic Partitions (airbyte_config.subscription.topic_partitions)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_TOPIC_PARTITIONS
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config subscription.topic_partitions [value]
Airbyte Config Subscription Topic Pattern (airbyte_config.subscription.topic_pattern)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_SUBSCRIPTION_TOPIC_PATTERN
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config subscription.topic_pattern [value]
Airbyte Config Test Topic (airbyte_config.test_topic)
-
Environment variable:
TAP_KAFKA_AIRBYTE_CONFIG_TEST_TOPIC
The Topic to test in case the Airbyte can consume messages.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_config test_topic [value]
Airbyte Spec Image (airbyte_spec.image)
-
Environment variable:
TAP_KAFKA_AIRBYTE_SPEC_IMAGE
-
Default Value:
airbyte/source-kafka
Airbyte image to run
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_spec image [value]
Airbyte Spec Tag (airbyte_spec.tag)
-
Environment variable:
TAP_KAFKA_AIRBYTE_SPEC_TAG
-
Default Value:
latest
Airbyte image tag
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set airbyte_spec tag [value]
Docker Mounts (docker_mounts)
-
Environment variable:
TAP_KAFKA_DOCKER_MOUNTS
Docker mounts to make available to the Airbyte container. Expects a list of maps containing source, target, and type as is documented in the docker --mount documentation
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set docker_mounts [value]
Expand To Show SDK Settings
Flattening Enabled (flattening_enabled)
-
Environment variable:
TAP_KAFKA_FLATTENING_ENABLED
'True' to enable schema flattening and automatically expand nested properties.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set flattening_enabled [value]
Flattening Max Depth (flattening_max_depth)
-
Environment variable:
TAP_KAFKA_FLATTENING_MAX_DEPTH
The max depth to flatten schemas.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set flattening_max_depth [value]
Stream Map Config (stream_map_config)
-
Environment variable:
TAP_KAFKA_STREAM_MAP_CONFIG
User-defined config values to be used within map expressions.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set stream_map_config [value]
Stream Maps (stream_maps)
-
Environment variable:
TAP_KAFKA_STREAM_MAPS
Config object for stream maps capability. For more information check out Stream Maps.
Configure this setting directly using the following Meltano command:
meltano config tap-kafka set stream_maps [value]
Something missing?
This page is generated from a YAML file that you can contribute changes to.
Edit it on GitHub!Looking for help?
#plugins-general
channel.