Connect to the Kafka cluster
The Kafka cluster can be connected to by DNS address and IP address.
We recommend connecting by DNS address because DNS addresses correspond to node roles and lead to the actual IP addresses of the master and replicas. IP addresses correspond to specific nodes. If the master is unavailable, one of the replicas will assume its role, the master's IP address will change, and the IP connection will stop working.
If the cluster is connected to a private subnet and you want to work with it via DNS, connect the cluster subnet to a cloud router with access to the external network. Use the instructions Set up Internet access through a cloud router.
A public IP address cannot be used.
Connection ports
Use ports to connect to Kafka:
- 9092 — port for connection without SSL certificate;
- 9093 — port for connection with SSL certificate.
Ways of connection
View the address for connection
- In the Dashboard, on the top menu, click Products and select Cloud Databases.
- Open the Active tab.
- Open the Database Cluster page → Connection tab.
- In the Addresses to connect block, look up the address.
Connect with SSL
Connecting using TLS/SSL encryption provides a secure connection between your server and the database cluster.
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Use the connection example for the concumer:
kcat -C \
-b <host>:<port> \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<port>
— port for connection;<topic_name>
— topic name;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
kcat -P \
-b <host>:<port> \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<port>
— port for connection;<topic_name>
— topic name;<user_name>
— name of the user with the producer role who has access to the topic;<password>
— user password.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the confluent-kafka library:
pip install confluent-kafka
-
Use the connection example for the concumer:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:<port>",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"ssl.ca.location": "<path>",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Specify:
<host>
— DNS address of the node;<port>
— port for connection;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<path>
— the full path to the root certificate;<topic_name>
— topic name.
-
Use the connection example for the producer:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:<port>",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"ssl.ca.location": "<path>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Specify:
<host>
— DNS address of the node;<port>
— port for connection;<user_name>
— the name of the user with the producer role who has access to the topic;<password>
— user password;<path>
— the full path to the root certificate;<topic_name>
— topic name.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the kafka-python library:
pip install kafka-python
-
Use the connection example for the concumer:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:<port>",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<path>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Specify:
<topic_name>
— topic name<host>
— DNS address of the node;<port>
— port for connection;<path>
— the full path to the root certificate;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:<port>",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<path>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b"message",
)
result = future.get(timeout=60)
print(result)Specify:
<host>
— DNS address of the node;<port>
— port for connection;<path>
— the full path to the root certificate;<user_name>
— the name of the user with the producer role who has access to the topic;<password>
— user password;<topic_name>
— topic name.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Create the file
scram.go
:scram.go
package main
import (
"crypto/sha256"
"crypto/sha512"
"github.com/xdg-go/scram"
)
var (
SHA256 scram.HashGeneratorFcn = sha256.New
SHA512 scram.HashGeneratorFcn = sha512.New
)
type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err != nil {
return err
}
x.ClientConversation = x.Client.NewConversation()
return nil
}
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
response, err = x.ClientConversation.Step(challenge)
return
}
func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
} -
Use the connection example for the concumer:
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"os/signal"
"strings"
"github.com/IBM/sarama"
)
const BROKERS = "<host>:<port>"
const USER = "<user_name>"
const PASSWORD = "<password>"
const TOPIC = "<topic_name>"
const CERT = "<path>"
func main() {
brokers := BROKERS
splitBrokers := strings.Split(brokers, ",")
conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll
conf.Version = sarama.V3_3_1_0
conf.Consumer.Return.Errors = true
conf.ClientID = "go_client"
conf.Metadata.Full = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = USER
conf.Net.SASL.Password = PASSWORD
conf.Net.SASL.Handshake = true
conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
}
conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
certs := x509.NewCertPool()
pemPath := CERT
pemData, err := ioutil.ReadFile(pemPath)
if err != nil {
fmt.Println("Couldn't load cert: ", err.Error())
panic(err)
}
certs.AppendCertsFromPEM(pemData)
conf.Net.TLS.Enable = true
conf.Net.TLS.Config = &tls.Config{
RootCAs: certs,
}
master, err := sarama.NewConsumer(splitBrokers, conf)
if err != nil {
fmt.Println("Couldn't create consumer: ", err.Error())
panic(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
}
}()
consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
doneCh := make(chan struct{})
go func() {
for {
select {
case err := <-consumer.Errors():
fmt.Println(err)
case msg := <-consumer.Messages():
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
case <-signals:
doneCh <- struct{}{}
}
}
}()
<-doneCh
}Specify:
<host>
— DNS address of the node;<port>
— port for connection;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<topic_name>