Connect to the cluster
The Kafka cluster can be connected to by DNS address and IP address.
We recommend connecting by DNS address because the DNS addresses correspond to the node roles and lead to the actual IP addresses of the master and replicas. IP addresses correspond to specific nodes. If the master is unavailable, one of the replicas will assume its role, the master's IP address will change, and the IP connection will stop working.
If the cluster is connected to a private subnet and you want to work with it via DNS, connect cloud router-to-external-network to the cluster subnet.
A public IP address cannot be used.
Ports
Use ports to connect to Kafka:
- 9092 — port for connection without SSL certificate;
- 9093 — port for connection with SSL certificate.
Ways to connect
- through the kcat terminal client;
- from the with SSL and without SSL program code.
View the address to connect
- In Control Panel, go to Cloud Platform → Databases.
- Open the Database Cluster page → Connect tab.
- In the Addresses to connect block, look up the address.
Connect with SSL
Connecting using TLS/SSL encryption provides a secure connection between your server and the database cluster.
- Bash
- Python (confluent-kafka)
- Python (kafka-python)
- Node.js
- Java
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 600 ~/.kafka/root.crt -
Use the connection example for the concumer:
kafkacat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password.
-
Use the connection example for the producer:
kafkacat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtSpecify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the confluent-kafka library:
pip install confluent-kafka
-
Use the connection example for the concumer:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"ssl.ca.location": "<full_path_to_root_certificate>",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Specify:
<host>
— DNS address of the node;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password;<full_path_to_root_certificate>
is the full path to the root certificate;<topic_name>
is the name of the topic.
-
Use the connection example for the producer:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"ssl.ca.location": "<full_path_to_root_certificate>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Specify:
<host>
— DNS address of the node;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password;<full_path_to_root_certificate>
is the full path to the root certificate;<topic_name>
is the name of the topic.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the kafka-python library:
pip install kafka-python
-
Use the connection example for the concumer:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Specify:
<topic_name>
— the name of the topic<host>
— DNS address of the node;<full_path_to_root_certificate>
is the full path to the root certificate;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password.
-
Use the connection example for the producer:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b "message",
)
result = future.get(timeout=60)
print(result)Specify:
<host>
— DNS address of the node;<full_path_to_root_certificate>
is the full path to the root certificate;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Install the kafkajs library:
npm install kafkajs
-
Use the connection example for the concumer:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Specify:
<host>
— DNS address of the node;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password;<full_path_to_root_certificate>
is the full path to the root certificate;<topic_name>
is the name of the topic.
-
Use the connection example for the producer:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Specify:
<host>
— DNS address of the node;<full_path_to_root_certificate>
is the full path to the root certificate;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Download the root certificate and place it in the
~/.kafka/
folder:mkdir -p ~/.kafka/
wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Navigate to the directory where the Java certificate store will be located:
cd /etc/security
-
Add the SSL certificate to the Java trusted certificate store (Java Key Store):
keytool -importcert -alias RootCA -file ~/.kafka/root.crt -keystore ssl -storepass <keystore_password> --noprompt
Specify
<keystore_password>
— the vault password for additional security. -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Use the connection example for the concumer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\""%s\" password=\""%s\"";"", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password;<keystore_password>
is the vault password.
-
Use the connection example for the producer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\""%s\" password=\""%s\"";"", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
} }Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password;<keystore_password>
— certificate password.
-
Build and run the application:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar
Connect without SSL
- Bash
- Python (confluent-kafka)
- Python (kafka-python)
- Node.js
- Java
-
Open the CLI.
-
Use the connection example for the concumer:
kafkacat -C \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512.Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password.
-
Use the connection example for the producer:
kafkacat -P \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the username of the user with the producer role who has access to the topic;<password>
is the user's password.
-
Install the confluent-kafka library:
pip install confluent-kafka
-
Use the connection example for the concumer:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Specify:
<host>
— DNS address of the node;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Use the connection example for the producer:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Specify:
<host>
— DNS address of the node;<user_name>
is the database username;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Install the kafka-python library:
pip install kafka-python
-
Use the connection example for the concumer:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Specify:
<topic_name>
— the name of the topic<host>
— DNS address of the node;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password.
-
Use the connection example for the producer:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b "message",
)
result = future.get(timeout=60)
print(result)Specify:
<host>
— DNS address of the node;<user_name>
is the database username;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Install the kafkajs library:
npm install kafkajs
-
Use the connection example for the concumer:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Specify:
<host>
— DNS address of the node;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Use the connection example for the producer:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Specify:
<host>
— DNS address of the node;<user_name>
is the database username;<password>
is the user's password;<topic_name>
is the name of the topic.
-
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Use the connection example for the concumer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\""%s\" password=\""%s\"";"", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the name of the user with the role of consumer who has access to the topix;<password>
is the user's password.
-
Use the connection example for the producer:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\""%s\" password=\""%s\"";"", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message").get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
} }Specify:
<host>
— DNS address of the node;<topic_name>
is the name of the topic;<user_name>
is the database username;<password>
is the user's password.
-
Build and run the application:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar