Перейти к основному содержимому

Подключиться к кластеру Kafka

Последнее изменение:

К кластеру Kafka можно подключиться:

  • через консольный клиент kcat. Чтобы избежать возможных ошибок при подключении, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1;
  • программный код.

Для всех способов доступно подключение с SSL и без SSL.

При подключении укажите порт и адрес.

Порты для подключения

Для подключения к Kafka используйте порты:

  • 9092 — порт для подключения без SSL-сертификата;
  • 9093 — порт для подключения с SSL-сертификатом.

Адреса для подключения

Адрес для подключения зависит от подсети кластера и от того, откуда вы подключаетесь. Вы можете выбрать адрес в зависимости от одного из сценариев:

К кластеру в приватной подсети подключиться из интернета нельзя.

Подключение к кластеру в публичной подсети

Если кластер в публичной подсети, к нодам можно подключиться по DNS-адресу или IP-адресу из публичной подсети.

Мы рекомендуем подключаться по DNS-адресу. Для DNS-адресов в кластере используется механизм master discovery — адрес привязан к роли ноды, а не к самой ноде. При недоступности мастера одна из реплик становится новым мастером и адрес переходит на новую ноду вместе с ролью.

При подключении по IP-адресу из публичной подсети механизм master discovery не используется. Если одна из реплик станет новым мастером, IP-адрес мастера изменится и подключение по старому IP-адресу перестанет работать.

Вы можете посмотреть адрес для подключения в панели управления.

Подключение из приватной подсети к кластеру в приватной подсети

Если вы подключаетесь из приватной подсети к кластеру в приватной подсети, можно использовать DNS-адрес или приватный IP-адрес.

Мы рекомендуем подключаться по DNS-адресу. Для DNS-адресов в кластере используется механизм master discovery — адрес привязан к роли ноды, а не к самой ноде. При недоступности мастера одна из реплик становится новым мастером и адрес переходит на новую ноду вместе с ролью.

При подключении по приватному IP-адресу механизм master discovery не используется. Если одна из реплик станет новым мастером, IP-адрес мастера изменится и подключение по старому IP-адресу перестанет работать.

Чтобы подключиться из другой приватной подсети, сначала подключите обе приватные подсети к облачному роутеру.

Вы можете посмотреть адрес для подключения в панели управления.

Посмотреть адрес для подключения

  1. В панели управления в верхнем меню нажмите Продукты и выберите Облачные базы данных.
  2. Откройте вкладку Активные.
  3. Откройте страницу кластера баз данных → вкладка Подключение.
  4. В блоке Адреса для подключения посмотрите адрес.

Подключиться с SSL

Подключение с использованием TLS(SSL)-шифрования обеспечивает безопасное соединение между вашим сервером и кластером баз данных.

к сведению

Чтобы избежать возможных ошибок при подключении через консольный клиент kcat, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1.

  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Используйте пример подключения для консьюмера:

    kcat -C \
    -b <host>:<port> \
    -t <topic_name> \
    -X sasl.username=<user_name> \
    -X sasl.password=<password> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X ssl.ca.location=$HOME/.kafka/root.crt

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  3. Используйте пример подключения для продюсера:

    kcat -P \
    -b <host>:<port> \
    -t <topic_name> \
    -X sasl.username=<user_name> \
    -X sasl.password=<password> \
    -X security.protocol=SASL_SSL \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X ssl.ca.location=$HOME/.kafka/root.crt

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Установите библиотеку confluent-kafka:

    pip install confluent-kafka
  3. Используйте пример подключения для консьюмера:

    import confluent_kafka

    def error_callback(err):
    raise err

    consumer = confluent_kafka.Consumer({
    "bootstrap.servers": "<host>:<port>",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "<user_name>",
    "sasl.password": "<password>",
    "group.id": "example",
    "ssl.ca.location": "<path>",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "error_cb": error_callback,
    })

    consumer.subscribe(["<topic_name>"])

    while True:
    record = consumer.poll(timeout=1.0)
    if record is not None:
    if record.error():
    if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
    print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
    elif record.error():
    raise confluent_kafka.KafkaException(record.error())
    else:
    print(record.value())

    consumer.close()

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <path> — полный путь до корневого сертификата;
    • <topic_name> — имя топика.
  4. Используйте пример подключения для продюсера:

    import confluent_kafka

    def error_callback(err):
    raise err

    producer = confluent_kafka.Producer({
    "bootstrap.servers": "<host>:<port>",
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "<user_name>",
    "sasl.password": "<password>",
    "ssl.ca.location": "<path>",
    "error_cb": error_callback,
    })
    producer.produce("<topic_name>", "message")
    producer.flush(60)

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <path> — полный путь до корневого сертификата;
    • <topic_name> — имя топика.
  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Установите библиотеку kafka-python:

    pip install kafka-python
  3. Используйте пример подключения для консьюмера:

    import kafka

    consumer = kafka.KafkaConsumer(
    "<topic_name>",
    bootstrap_servers="<host>:<port>",
    security_protocol="SASL_SSL",
    sasl_mechanism="SCRAM-SHA-512",
    ssl_cafile="<path>",
    sasl_plain_username="<user_name>",
    sasl_plain_password="<password>",
    )
    for record in consumer:
    print(record)

    Укажите:

    • <topic_name> — имя топика
    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <path> — полный путь до корневого сертификата;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  4. Используйте пример подключения для продюсера:

    import kafka

    producer = kafka.KafkaProducer(
    bootstrap_servers="<host>:<port>",
    security_protocol="SASL_SSL",
    sasl_mechanism="SCRAM-SHA-512",
    ssl_cafile="<path>",
    sasl_plain_username="<user_name>",
    sasl_plain_password="<password>",
    )
    future = producer.send(
    topic="<topic_name>",
    value=b"message",
    )
    result = future.get(timeout=60)
    print(result)

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <path> — полный путь до корневого сертификата;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Создайте файл scram.go:

    scram.go
    package main

    import (
    "crypto/sha256"
    "crypto/sha512"

    "github.com/xdg-go/scram"
    )

    var (
    SHA256 scram.HashGeneratorFcn = sha256.New
    SHA512 scram.HashGeneratorFcn = sha512.New
    )

    type XDGSCRAMClient struct {
    *scram.Client
    *scram.ClientConversation
    scram.HashGeneratorFcn
    }

    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
    x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
    if err != nil {
    return err
    }
    x.ClientConversation = x.Client.NewConversation()
    return nil
    }

    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
    response, err = x.ClientConversation.Step(challenge)
    return
    }

    func (x *XDGSCRAMClient) Done() bool {
    return x.ClientConversation.Done()
    }
  3. Используйте пример подключения для консьюмера:

    package main

    import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "os"
    "os/signal"
    "strings"

    "github.com/IBM/sarama"
    )

    const BROKERS = "<host>:<port>"
    const USER = "<user_name>"
    const PASSWORD = "<password>"
    const TOPIC = "<topic_name>"
    const CERT = "<path>"

    func main() {
    brokers := BROKERS
    splitBrokers := strings.Split(brokers, ",")
    conf := sarama.NewConfig()
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Version = sarama.V3_3_1_0
    conf.Consumer.Return.Errors = true
    conf.ClientID = "go_client"
    conf.Metadata.Full = true
    conf.Net.SASL.Enable = true
    conf.Net.SASL.User = USER
    conf.Net.SASL.Password = PASSWORD
    conf.Net.SASL.Handshake = true
    conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
    return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
    }
    conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

    certs := x509.NewCertPool()
    pemPath := CERT
    pemData, err := ioutil.ReadFile(pemPath)
    if err != nil {
    fmt.Println("Couldn't load cert: ", err.Error())
    panic(err)
    }
    certs.AppendCertsFromPEM(pemData)

    conf.Net.TLS.Enable = true
    conf.Net.TLS.Config = &tls.Config{
    RootCAs: certs,
    }

    master, err := sarama.NewConsumer(splitBrokers, conf)
    if err != nil {
    fmt.Println("Couldn't create consumer: ", err.Error())
    panic(err)
    }

    defer func() {
    if err := master.Close(); err != nil {
    panic(err)
    }
    }()

    consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)
    if err != nil {
    panic(err)
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    doneCh := make(chan struct{})
    go func() {
    for {
    select {
    case err := <-consumer.Errors():
    fmt.Println(err)
    case msg := <-consumer.Messages():
    fmt.Println("Received messages", string(msg.Key), string(msg.Value))
    case <-signals:
    doneCh <- struct{}{}
    }
    }
    }()

    <-doneCh
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика;
    • <path> — полный путь до корневого сертификата.
  4. Используйте пример подключения для продюсера:

    package main

    import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "strings"

    "github.com/IBM/sarama"
    )

    const BROKERS = "<host>:<port>"
    const USER = "<user_name>"
    const PASSWORD = "<password>"
    const TOPIC = "<topic_name>"
    const CERT = "<path>"

    func main() {
    brokers := BROKERS
    splitBrokers := strings.Split(brokers, ",")
    conf := sarama.NewConfig()
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.Version = sarama.V3_3_1_0
    conf.ClientID = "go_client"
    conf.Net.SASL.Enable = true
    conf.Net.SASL.Handshake = true
    conf.Net.SASL.User = USER
    conf.Net.SASL.Password = PASSWORD
    conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
    return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
    }
    conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)

    certs := x509.NewCertPool()
    pemPath := CERT
    pemData, err := ioutil.ReadFile(pemPath)
    if err != nil {
    fmt.Println("Couldn't load cert: ", err.Error())
    panic(err)
    }
    certs.AppendCertsFromPEM(pemData)

    conf.Net.TLS.Enable = true
    conf.Net.TLS.Config = &tls.Config{
    RootCAs: certs,
    }

    syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
    if err != nil {
    fmt.Println("Couldn't create producer: ", err.Error())
    panic(err)
    }

    msg := &sarama.ProducerMessage{
    Topic: TOPIC,
    Value: sarama.StringEncoder("message"),
    }
    _, _, err = syncProducer.SendMessage(msg)
    if err != nil {
    fmt.Println("Couldn't send message: ", err.Error())
    panic(err)
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика;
    • <path> — полный путь до корневого сертификата.
  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Установите библиотеку kafkajs:

    npm install kafkajs
  3. Используйте пример подключения для консьюмера:

    const fs = require('fs');
    const { Kafka } = require('kafkajs');

    const config = {
    brokers: ['<host>:<port>'],
    ssl: {
    ca: [fs.readFileSync('<path>', 'utf-8')],
    },
    sasl: {
    mechanism: 'scram-sha-512',
    username: '<user_name>',
    password: '<password>',
    },
    };

    const kafka = new Kafka(config);
    const consumer = kafka.consumer({ groupId: 'example' });

    const run = async () => {
    await consumer.connect();
    await consumer.subscribe({
    topic: '<topic_name>',
    fromBeginning: true,
    });

    await consumer.run({
    eachMessage: async ({ message }) => {
    console.log({ value: message.value.toString() });
    },
    });
    };

    run();

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <path> — полный путь до корневого сертификата;
    • <topic_name> — имя топика.
  4. Используйте пример подключения для продюсера:

    const fs = require('fs');
    const { Kafka } = require('kafkajs');

    const config = {
    brokers: ['<host>:<port>'],
    ssl: {
    ca: [fs.readFileSync('<path>', 'utf-8')],
    },
    sasl: {
    mechanism: 'scram-sha-512',
    username: '<user_name>',
    password: '<password>',
    },
    };

    const kafka = new Kafka(config);
    const producer = kafka.producer();

    const run = async () => {
    await producer.connect();
    await producer.send({
    topic: '<topic_name>',
    messages: [{ value: 'message' }],
    });
    await producer.disconnect();
    };

    run();

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <path> — полный путь до корневого сертификата;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Скачайте корневой сертификат и поместите его в папку ~/.kafka/:

    mkdir -p ~/.kafka/
    wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
    chmod 0600 ~/.kafka/root.crt
  2. Перейдите в каталог, где будет располагаться хранилище сертификатов Java:

    cd /etc/security
  3. Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store):

    keytool \
    -importcert \
    -alias RootCA \
    -file ~/.kafka/root.crt \
    -keystore ssl \
    -storepass <keystore password> \
    -noprompt

    Укажите <keystore_password> — пароль хранилища для дополнительной защиты.

  4. Создайте конфигурационный файл для Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>app</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>
    <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.7</version>
    </dependency>
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.14.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
    </dependency>
    </dependencies>
    <build>
    <finalName>${project.artifactId}-${project.version}</finalName>
    <sourceDirectory>src</sourceDirectory>
    <resources>
    <resource>
    <directory>src</directory>
    </resource>
    </resources>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
    <execution>
    <goals>
    <goal>attached</goal>
    </goals>
    <phase>package</phase>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
    <manifest>
    <mainClass>com.example.App</mainClass>
    </manifest>
    </archive>
    </configuration>
    </execution>
    </executions>
    </plugin>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
    <archive>
    <manifest>
    <mainClass>com.example.App</mainClass>
    </manifest>
    </archive>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>
  5. Используйте пример подключения для консьюмера:

    src/com/example/App.java
    package com.example;

    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;

    public class App {
    private static final String HOST = "<host>:<port>";
    private static final String TOPIC = "<topic_name>";
    private static final String USER = "<user_name>";
    private static final String PASSWORD = "<password>";
    private static final String TS_FILE = "/etc/security/ssl";
    private static final String TS_PASSWORD = "<keystore_password>";

    public static void main(String[] args) {
    String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", "example");
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASSWORD);

    Consumer<String, String>consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record: records) {
    System.out.println(record.value());
    }
    }
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <keystore_password> — пароль хранилища.
  6. Используйте пример подключения для продюсера:

    src/com/example/App.java
    package com.example;

    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;

    public class App {
    private static final String HOST = "<host>:<port>";
    private static final String TOPIC = "<topic_name>";
    private static final String USER = "<user_name>";
    private static final String PASSWORD = "<password>";
    private static final String TS_FILE = "/etc/security/ssl";
    private static final String TS_PASSWORD = "<keystore_password>";

    public static void main(String[] args) {
    String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_SSL");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);
    props.put("ssl.truststore.location", TS_FILE);
    props.put("ssl.truststore.password", TS_PASSWORD);

    Producer<String, String> producer = new KafkaProducer<>(props);
    try {
    producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
    producer.flush();
    producer.close();
    } catch (Exception ex) {
    System.out.println(ex);
    producer.close();
    }
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <keystore_password> — пароль сертификата.
  7. Соберите и запустите приложение:

    mvn clean package
    java -jar target/app-1.0-jar-with-dependencies.jar

Подключиться без SSL

к сведению

Чтобы избежать возможных ошибок при подключении через консольный клиент kcat, рекомендуем использовать библиотеку librdkafka не ниже версии 2.6.1.

  1. Откройте CLI.

  2. Используйте пример подключения для консьюмера:

    kcat -C \
    -b <host>:<port> \
    -t <topic_name> \
    -X sasl.username=<user_name> \
    -X sasl.password=<password> \
    -X security.protocol=SASL_PLAINTEXT \
    -X sasl.mechanisms=SCRAM-SHA-512

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  3. Используйте пример подключения для продюсера:

    kcat -P \
    -b <host>:<port> \
    -t <topic_name> \
    -X sasl.username=<user_name> \
    -X sasl.password=<password> \
    -X security.protocol=SASL_PLAINTEXT \
    -X sasl.mechanisms=SCRAM-SHA-512

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью продюсер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  1. Установите библиотеку confluent-kafka:

    pip install confluent-kafka
  2. Используйте пример подключения для консьюмера:

    import confluent_kafka

    def error_callback(err):
    raise err

    consumer = confluent_kafka.Consumer({
    "bootstrap.servers": "<host>:<port>",
    "security.protocol": "SASL_PLAINTEXT",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "<user_name>",
    "sasl.password": "<password>",
    "group.id": "example",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,
    "error_cb": error_callback,
    })

    consumer.subscribe(["<topic_name>"])

    while True:
    record = consumer.poll(timeout=1.0)
    if record is not None:
    if record.error():
    if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
    print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
    elif record.error():
    raise confluent_kafka.KafkaException(record.error())
    else:
    print(record.value())

    consumer.close()

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  3. Используйте пример подключения для продюсера:

    import confluent_kafka

    def error_callback(err):
    raise err

    producer = confluent_kafka.Producer({
    "bootstrap.servers": "<host>:<port>",
    "security.protocol": "SASL_PLAINTEXT",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.username": "<user_name>",
    "sasl.password": "<password>",
    "error_cb": error_callback,
    })
    producer.produce("<topic_name>", "message")
    producer.flush(60)

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя базы данных;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Установите библиотеку kafka-python:

    pip install kafka-python
  2. Используйте пример подключения для консьюмера:

    import kafka

    consumer = kafka.KafkaConsumer(
    "<topic_name>",
    bootstrap_servers="<host>:<port>",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username="<user_name>",
    sasl_plain_password="<password>",
    )
    for record in consumer:
    print(record)

    Укажите:

    • <topic_name> — имя топика
    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  3. Используйте пример подключения для продюсера:

    import kafka

    producer = kafka.KafkaProducer(
    bootstrap_servers="<host>:<port>",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="SCRAM-SHA-512",
    sasl_plain_username="<user_name>",
    sasl_plain_password="<password>",
    )
    future = producer.send(
    topic="<topic_name>",
    value=b"message",
    )
    result = future.get(timeout=60)
    print(result)

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя базы данных;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Создайте файл scram.go:

    scram.go
    package main

    import (
    "crypto/sha256"
    "crypto/sha512"

    "github.com/xdg-go/scram"
    )

    var (
    SHA256 scram.HashGeneratorFcn = sha256.New
    SHA512 scram.HashGeneratorFcn = sha512.New
    )

    type XDGSCRAMClient struct {
    *scram.Client
    *scram.ClientConversation
    scram.HashGeneratorFcn
    }

    func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
    x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
    if err != nil {
    return err
    }
    x.ClientConversation = x.Client.NewConversation()
    return nil
    }

    func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
    response, err = x.ClientConversation.Step(challenge)
    return
    }

    func (x *XDGSCRAMClient) Done() bool {
    return x.ClientConversation.Done()
    }
  2. Используйте пример подключения для консьюмера:

    package main

    import (
    "fmt"
    "os"
    "os/signal"
    "strings"

    "github.com/IBM/sarama"
    )

    const BROKERS = "<host>:<port>"
    const USER = "<user_name>"
    const PASSWORD = "<password>"
    const TOPIC = "<topic_name>"

    func main() {
    brokers := BROKERS
    splitBrokers := strings.Split(brokers, ",")
    conf := sarama.NewConfig()
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Version = sarama.V3_3_1_0
    conf.Consumer.Return.Errors = true
    conf.ClientID = "go_client"
    conf.Metadata.Full = true
    conf.Net.SASL.Enable = true
    conf.Net.SASL.User = USER
    conf.Net.SASL.Password = PASSWORD
    conf.Net.SASL.Handshake = true
    conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
    return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
    }
    conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    conf.Net.TLS.Enable = false

    master, err := sarama.NewConsumer(splitBrokers, conf)
    if err != nil {
    fmt.Println("Couldn't create consumer: ", err.Error())
    panic(err)
    }

    defer func() {
    if err := master.Close(); err != nil {
    panic(err)
    }
    }()

    consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)
    if err != nil {
    panic(err)
    }

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    doneCh := make(chan struct{})
    go func() {
    for {
    select {
    case err := <-consumer.Errors():
    fmt.Println(err)
    case msg := <-consumer.Messages():
    fmt.Println("Received messages", string(msg.Key), string(msg.Value))
    case <-signals:
    doneCh <- struct{}{}
    }
    }
    }()

    <-doneCh
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  3. Используйте пример подключения для продюсера:

    package main

    import (
    "fmt"
    "strings"

    "github.com/IBM/sarama"
    )

    const BROKERS = "<host>:<port>"
    const USER = "<user_name>"
    const PASSWORD = "<password>"
    const TOPIC = "<topic_name>"

    func main() {
    brokers := BROKERS
    splitBrokers := strings.Split(brokers, ",")
    conf := sarama.NewConfig()
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.Version = sarama.V3_3_1_0
    conf.ClientID = "go_client"
    conf.Net.SASL.Enable = true
    conf.Net.SASL.Handshake = true
    conf.Net.SASL.User = USER
    conf.Net.SASL.Password = PASSWORD
    conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
    return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
    }
    conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
    conf.Net.TLS.Enable = false

    syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
    if err != nil {
    fmt.Println("Couldn't create producer: ", err.Error())
    panic(err)
    }

    msg := &sarama.ProducerMessage{
    Topic: TOPIC,
    Value: sarama.StringEncoder("message"),
    }
    _, _, err = syncProducer.SendMessage(msg)
    if err != nil {
    fmt.Println("Couldn't send message: ", err.Error())
    panic(err)
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Установите библиотеку kafkajs:

    npm install kafkajs
  2. Используйте пример подключения для консьюмера:

    const { Kafka } = require('kafkajs');

    const config = {
    brokers: ['<host>:<port>'],
    ssl: false,
    sasl: {
    mechanism: 'scram-sha-512',
    username: '<user_name>',
    password: '<password>',
    },
    };

    const kafka = new Kafka(config);
    const consumer = kafka.consumer({ groupId: 'example' });

    const run = async () => {
    await consumer.connect();
    await consumer.subscribe({
    topic: '<topic_name>',
    fromBeginning: true,
    });

    await consumer.run({
    eachMessage: async ({ message }) => {
    console.log({ value: message.value.toString() });
    },
    });
    };

    run();

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  3. Используйте пример подключения для продюсера:

    const { Kafka } = require('kafkajs');

    const config = {
    brokers: ['<host>:<port>'],
    ssl: false,
    sasl: {
    mechanism: 'scram-sha-512',
    username: '<user_name>',
    password: '<password>',
    },
    };

    const kafka = new Kafka(config);
    const producer = kafka.producer();

    const run = async () => {
    await producer.connect();
    await producer.send({
    topic: '<topic_name>',
    messages: [{ value: 'message' }],
    });
    await producer.disconnect();
    };

    run();

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <user_name> — имя пользователя базы данных;
    • <password> — пароль пользователя;
    • <topic_name> — имя топика.
  1. Создайте конфигурационный файл для Maven:

    pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>app</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>
    <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.7</version>
    </dependency>
    <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.14.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
    </dependency>
    </dependencies>
    <build>
    <finalName>${project.artifactId}-${project.version}</finalName>
    <sourceDirectory>src</sourceDirectory>
    <resources>
    <resource>
    <directory>src</directory>
    </resource>
    </resources>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <executions>
    <execution>
    <goals>
    <goal>attached</goal>
    </goals>
    <phase>package</phase>
    <configuration>
    <descriptorRefs>
    <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
    <manifest>
    <mainClass>com.example.App</mainClass>
    </manifest>
    </archive>
    </configuration>
    </execution>
    </executions>
    </plugin>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>3.1.0</version>
    <configuration>
    <archive>
    <manifest>
    <mainClass>com.example.App</mainClass>
    </manifest>
    </archive>
    </configuration>
    </plugin>
    </plugins>
    </build>
    </project>
  2. Используйте пример подключения для консьюмера:

    src/com/example/App.java
    package com.example;

    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.clients.consumer.*;

    public class App {
    private static final String HOST = "<host>:<port>";
    private static final String TOPIC = "<topic_name>";
    private static final String USER = "<user_name>";
    private static final String PASSWORD = "<password>";

    public static void main(String[] args) {
    String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);

    String deserializer = StringDeserializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("group.id", "example");
    props.put("key.deserializer", deserializer);
    props.put("value.deserializer", deserializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Consumer<String, String>consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(new String[] {TOPIC}));

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record: records) {
    System.out.println(record.value());
    }
    }
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя с ролью консьюмер, который имеет доступ к топику;
    • <password> — пароль пользователя.
  3. Используйте пример подключения для продюсера:

    src/com/example/App.java
    package com.example;

    import java.util.*;
    import org.apache.kafka.common.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.clients.producer.*;

    public class App {
    private static final String HOST = "<host>:<port>";
    private static final String TOPIC = "<topic_name>";
    private static final String USER = "<user_name>";
    private static final String PASSWORD = "<password>";

    public static void main(String[] args) {
    String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);

    String serializer = StringSerializer.class.getName();
    Properties props = new Properties();
    props.put("bootstrap.servers", HOST);
    props.put("acks", "all");
    props.put("key.serializer", serializer);
    props.put("value.serializer", serializer);
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    props.put("sasl.jaas.config", jaasCfg);

    Producer<String, String> producer = new KafkaProducer<>(props);
    try {
    producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
    producer.flush();
    producer.close();
    } catch (Exception ex) {
    System.out.println(ex);
    producer.close();
    }
    }
    }

    Укажите:

    • <host> — DNS-адрес ноды;
    • <port>порт для подключения;
    • <topic_name> — имя топика;
    • <user_name> — имя пользователя базы данных;
    • <password> — пароль пользователя.
  4. Соберите и запустите приложение:

    mvn clean package
    java -jar target/app-1.0-jar-with-dependencies.jar