Подключиться к кластеру Kafka
К кластеру Kafka можно подключиться:
- через консольный клиент kcat. Чтобы избежать возможных ошибок при подключении, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1;
- программный код.
Для всех способов доступно подключение с SSL и без SSL.
При подключении укажите порт и адрес.
Порты для подключения
Для подключения к Kafka используйте порты:
- 9092 — порт для подключения без SSL-сертификата;
- 9093 — порт для подключения с SSL-сертификатом.
Адреса для подключения
Адрес для подключения зависит от подсети кластера и от того, откуда вы подключаетесь. Вы можете выбрать адрес в зависимости от одного из сценариев:
- подключение к кластеру в публичной подсети;
- подключение из приватной подсети к кластеру в приватной подсети.
К кластеру в приватной подсети подключиться из интернета нельзя.

Подключение к кластеру в публичной подсети
Если кластер в публичной подсети, к нодам можно подключиться по DNS-адресу или IP-адресу из публичной подсети.
Мы рекомендуем подключаться по DNS-адресу. Для DNS-адресов в кластере используется механизм master discovery — адрес привязан к роли ноды, а не к самой ноде. При недоступности мастера одна из реплик становится новым мастером и адрес переходит на новую ноду вместе с ролью.
При подключении по IP-адресу из публичной подсети механизм master discovery не используется. Если одна из реплик станет новым мастером, IP-адрес мастера изменится и подключение по старому IP-адресу перестанет работать.
Вы можете посмотреть адрес для подключения в панели управления.
Подключение из приватной подсети к кластеру в приватной подсети
Если вы подключаетесь из приватной подсети к кластеру в приватной подсети, можно использовать DNS-адрес или приватный IP-адрес.
Мы рекомендуем подключаться по DNS-адресу. Для DNS-адресов в кластере используется механизм master discovery — адрес привязан к роли ноды, а не к самой ноде. При недоступности мастера одна из реплик становится новым мастером и адрес переходит на новую ноду вместе с ролью.
При подключении по приватному IP-адресу механизм master discovery не используется. Если одна из реплик станет новым мастером, IP-адрес мастера изменится и подключение по старому IP-адресу перестанет работать.
Чтобы подключиться из другой приватной подсети, сначала подключите обе приватные подсети к облачному роутеру.
Вы можете посмотреть адрес для подключения в панели управления.
Посмотреть адрес для подключения
- В панели управления в верхнем меню нажмите Проду кты и выберите Облачные базы данных.
- Откройте вкладку Активные.
- Откройте страницу кластера баз данных → вкладка Подключение.
- В блоке Адреса для подключения посмотрите адрес.
Подключиться с SSL
Подключение с использованием TLS(SSL)-шифрования обеспечивает безопасное соединение между вашим сервером и кластером баз данных.
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
Чтобы избежать возможных ошибок при подключении через консольный клиент kcat, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Используйте пример подключения для консьюмера:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtУкажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль польз ователя.
-
Используйте пример подключения для продюсера:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtУкажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Установите библиотеку confluent-kafka:
pip install confluent-kafka -
Используйте пример подключения для консьюмера:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","ssl.ca.location": "<path>","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<path>— полный путь до корневого сертификата;<topic_name>— имя топика.
-
Используйте пример подключения для продюсера:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","ssl.ca.location": "<path>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя;<path>— полный путь до корневого сертификата;<topic_name>— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Установите библиотеку kafka-python:
pip install kafka-python -
Используйте приме р подключения для консьюмера:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Укажите:
<topic_name>— имя топика<host>— DNS-адрес ноды;<port>— порт для подключения;<path>— полный путь до корневого сертификата;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя.
-
Используйте пример подключения для продюсера:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<path>— полный путь до корневого сертификата;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Создайте файл
scram.go:scram.go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Используйте пример подключения для консьюмера:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}master, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика;<path>— полный путь до корневого сертификата.
-
Используйте пример подключения для продюсера:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика;<path>— полный путь до корневого сертификата.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Установите библиотеку kafkajs:
npm install kafkajs -
Используйте пример подключения для консьюмера:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<path>— полный путь до корневого сертификата;<topic_name>— имя топика.
-
Используйте пример подключения для продюсера:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<path>— полный путь до корневого сертификата;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security -
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store):
keytool \-importcert \-alias RootCA \-file ~/.kafka/root.crt \-keystore ssl \-storepass <keystore password> \-nopromptУкажите
<keystore_password>— пароль хранилища для дополнительной защиты. -
Создайте конфигурационный файл для Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Используйте пример подключения для консьюмера:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<keystore_password>— пароль хранилища.
-
Используйте пример подключения для продюсера:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя;<keystore_password>— пароль сертификата.
-
Соберите и запустите приложение:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar
Подключиться без SSL
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
Чтобы избежать возможных ошибок при подключении через консольный клиент kcat, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1.
-
Откройте CLI.
-
Используйте пример подключения для консьюмера:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— им я топика;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя.
-
Используйте пример подключения для продюсера:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>— пароль пользователя.
-
Установите библиотеку confluent-kafka:
pip install confluent-kafka -
Используйте пример подключения для консьюмера:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Используйте пример подключения для продюсера:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя базы данных;<password>— пароль пользователя;<topic_name>— имя топика.
-
Установите библиотеку kafka-python:
pip install kafka-python -
Используйте пример подключения для консьюмера:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Укажите:
<topic_name>— имя топика<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя.
-
Используйте пример подключения для продюсера:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя базы данных;<password>— пароль пользователя;<topic_name>— имя топика.
-
Создайте файл
scram.go:scram.go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Используйте пример подключения для консьюмера:
package mainimport ("fmt""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsemaster, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Используйте пример подключения для продюсера:
package mainimport ("fmt""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsesyncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Установите библиотеку kafkajs:
npm install kafkajs -
Используйте пример подключения для консьюмера:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя;<topic_name>— имя топика.
-
Используйте пример подключения для продюсера:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<user_name>— имя пользователя базы данных;<password>— пароль пользователя;<topic_name>— имя топика.
-
Создайте конфигурационный файл для Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Используйте пример подключения для консьюмера:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>— пароль пользователя.
-
Используйте пример подключения для продюсера:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Укажите:
<host>— DNS-адрес ноды;<port>— порт для подключения;<topic_name>— имя топика;<user_name>— имя пользователя базы данных;<password>— пароль пользователя.
-
Соберите и запустите приложение:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar