Connect to the cluster
The Kafka cluster can be connected to by DNS address and IP address.
We recommend connecting by DNS address because DNS addresses correspond to node roles and lead to the actual IP addresses of the master and replicas. IP addresses correspond to specific nodes. If the master is unavailable, one of the replicas will assume its role, the master's IP address will change, and the IP connection will stop working.
If the cluster is connected to a private subnet and you want to work with it via DNS, connect the cluster subnet to a cloud router with access to the external network. Use the following instructions Set up internet access via cloud router.
A public IP address cannot be used.
Ports
Use ports to connect to Kafka:
- 9092 — port for connection without SSL certificate;
- 9093 — port for connection with SSL certificate.
Ways of connection
- through the kcat terminal client;
- from program code with SSL and without SSL.
View the address for connection
- In control panel go to Cloud platform → Databases.
- Open the Database Cluster page → tab Connection.
- In the block Addresses for connection check out the address.
Connect with SSL
Connecting using TLS/SSL encryption provides a secure connection between your server and the database cluster.
Bash
Python (confluent-kafka)
Python (kafka-python)
Node.js
Java
-
Download the root certificate and place it in the folder
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 600 ~/.kafka/root.crt -
Use the connection example for the concumer:
kcat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
kcat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password.
-
Download the root certificate and place it in the folder
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the confluent-kafka library:
pip install confluent-kafka
-
Use the connection example for the concumer:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"ssl.ca.location": "<full_path_to_root_certificate>",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Specify:
<host>
— DNS address of the node;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<full_path_to_root_certificate>
— the full path to the root certificate;<topic_name>
— TOPIC NAME.
-
Use the connection example for the producer:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"ssl.ca.location": "<full_path_to_root_certificate>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Specify:
<host>
— DNS address of the node;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password;<full_path_to_root_certificate>
— the full path to the root certificate;<topic_name>
— TOPIC NAME.
-
Download the root certificate and place it in the folder
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the kafka-python library:
pip install kafka-python
-
Use the connection example for the concumer:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Specify:
<topic_name>
— topic name<host>
— DNS address of the node;<full_path_to_root_certificate>
— the full path to the root certificate;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b"message",
)
result = future.get(timeout=60)
print(result)Specify:
<host>
— DNS address of the node;<full_path_to_root_certificate>
— the full path to the root certificate;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Download the root certificate and place it in the folder
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the kafkajs library:
npm install kafkajs
-
Use the connection example for the concumer:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Specify:
<host>
— DNS address of the node;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<full_path_to_root_certificate>
— the full path to the root certificate;<topic_name>
— TOPIC NAME.
-
Use the connection example for the producer:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Specify:
<host>
— DNS address of the node;<full_path_to_root_certificate>
— the full path to the root certificate;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Download the root certificate and place it in the folder
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Navigate to the directory where the Java certificate store will be located:
cd /etc/security
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store):
keytool -importcert -alias RootCA -file ~/.kafka/root.crt -keystore ssl -storepass <keystore_password> --noprompt
Specify
<keystore_password>
— storage password for additional protection. -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Use the connection example for the concumer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<keystore_password>
— storage password.
-
Use the connection example for the producer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password;<keystore_password>
— certificate password.
-
Build and run the application:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar
Connect without SSL
Bash
Python (confluent-kafka)
Python (kafka-python)
Node.js
Java
-
Open the CLI.
-
Use the connection example for the concumer:
kcat -C \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
kcat -P \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the producer role who has access to the topix;<password>
— user password.
-
Install the confluent-kafka library:
pip install confluent-kafka
-
Use the connection example for the concumer:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Specify:
<host>
— DNS address of the node;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Use the connection example for the producer:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Specify:
<host>
— DNS address of the node;<user_name>
— database user name;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Install the kafka-python library:
pip install kafka-python
-
Use the connection example for the concumer:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Specify:
<topic_name>
— topic name<host>
— DNS address of the node;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b"message",
)
result = future.get(timeout=60)
print(result)Specify:
<host>
— DNS address of the node;<user_name>
— database user name;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Install the kafkajs library:
npm install kafkajs
-
Use the connection example for the concumer:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Specify:
<host>
— DNS address of the node;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Use the connection example for the producer:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Specify:
<host>
— DNS address of the node;<user_name>
— database user name;<password>
— user password;<topic_name>
— TOPIC NAME.
-
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Use the connection example for the concumer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— the name of the user with the role of concumer who has access to the topic;<password>
— user password.
-
Use the connection example for the producer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
— TOPIC NAME;<user_name>
— database user name;<password>
— user password.
-
Build and run the application:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar