# Apache Kafka - Get started

## Deploy a cluster <a href="#postgresqlhowtos-commentprovisionnerunpaaspostgresql" id="postgresqlhowtos-commentprovisionnerunpaaspostgresql"></a>

To get started, go to ITCare and search for your target global service where you'll create your new Apache Kafka cluster.

Search for your Global Service in the top search bar and click on it to display its information page.

Once in your Global Service, click on the **Create Resource** button, select **Apache Kafka** and the required version.

Fill in the form:

* Define the name of the future cluster
* Number of brokers (3+)
* Sizing
* Storage required on each broker
* Target location
* Target network
* Management options (backup, monitoring, 24/7, remote site replication)

Click **Next** once all fields have been completed.

In the next step, enter the password for the super user account to be supplied, then click **Next**.

{% hint style="warning" %}
Passwords are not saved by cegedim.cloud. Be sure to save your password!
{% endhint %}

Review the summary before submitting the form.

{% hint style="info" %}
Provisioning can take up to 2 hours, depending on the current automation load.
{% endhint %}

Once the deployment is ready, you'll be notified by e-mail.

## Start a cluster

At the top of the cluster page, click on the **Manage** button, then on **Start** and confirm.

{% hint style="info" %}
Cluster startup starts all virtual machines attached to the cluster.
{% endhint %}

An e-mail notification will be sent when the service is activated.

## Stop a cluster

At the top of the cluster page, click on the **Manage** button, then on **Stop.**

Enter an RFC number for tracking (optional). Click on **Submit**.

{% hint style="warning" %}
Shutting down a cluster will stop all virtual machines attached to the cluster, and monitoring will be disabled.
{% endhint %}

An e-mail notification will be sent when the cluster is shut down.

## Resize nodes

At the top of the cluster page, click on the **Manage** button, then on **Resize nodes.**

Select the nodes you wish to resize and select the new size (cpu/ram).

{% hint style="info" %}
Each node will be resized and restarted sequentially.
{% endhint %}

An e-mail notification will be sent when all nodes have been resized.

## Delete a cluster

At the top of the cluster page, click on the **Manage** button, then on **Delete**. This will stop and delete all virtual machines.

{% hint style="danger" %}
Please note that this action is not recoverable!
{% endhint %}

Enter an RFC number for tracking (optional), then click **Submit**.

An e-mail notification will be sent when the cluster is deleted.

## How to manage Apache Kafka ?

To interact with your secure cluster using Kafka scripts, you first need to download the Apache Kafka archive from the official website.

{% embed url="<https://kafka.apache.org/downloads>" %}

Ideally, you should download the exact version corresponding to your cluster.

Once unzip and unarchived on your linux server, you will find the Kafka shell scripts under the /bin directory.

These scripts allows to :

* Produce and consume
* Manage users
* Manage topics
* Manager ACLs
* Manage item configurations

This guide will not get into the details of every scripts but help you get started with simple commands.

### Authentication <a href="#apachekafkahowtos-authentication" id="apachekafkahowtos-authentication"></a>

To connect to a secured Kafka cluster, you need to configure a keystore and a property file.

#### Create keystore <a href="#apachekafkahowtos-createkeystore" id="apachekafkahowtos-createkeystore"></a>

Create the keystore with the provided certificate :

{% code overflow="wrap" %}

```bash
keytool -keystore kafka.client.truststore.jks -alias ca-cert-cluster1 -import -file ca-cert -storepass <redacted> -keypass <redacted> -noprompt
```

{% endcode %}

* **Alias** : alias of the certificate inside the keystore
* **Import-file** : name of the certificate file containing the provided certificate
* **Storepass** and **keypass** : password to protect your keystore, should be identical

To list the content of your keystore, use this command :

```bash
keytool -list -v -keystore kafka.client.truststore.jks
```

#### Property file <a href="#apachekafkahowtos-propertyfile" id="apachekafkahowtos-propertyfile"></a>

With the keystore created, now you need a property file :

```ini
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="password";
ssl.truststore.location=/path/to/kafka.client.truststore.jks
ssl.truststore.password=keystore-password
```

* **username** : the kafka super user provided to you by email
* **password** : the password for that user that you provided at provisioning
* **ssl.truststore.location :** the location to your keystore previously created
* **ssl.truststore.password :** the password to unlock your keystore (storepass / keypass used)

#### Command line <a href="#apachekafkahowtos-commandline" id="apachekafkahowtos-commandline"></a>

With these elements, you can now use any Kafka shell script with the following parameter :

```
--command-config client.properties
```

### Manage topics <a href="#apachekafkahowtos-managetopics" id="apachekafkahowtos-managetopics"></a>

#### Create a topic <a href="#apachekafkahowtos-createatopic" id="apachekafkahowtos-createatopic"></a>

{% code overflow="wrap" %}

```
# Use an env variable for short commands, the bootstrap-server is available in ITCare
export BROKERS=broker1.hosting.cegedim.cloud:9094,broker2.hosting.cegedim.cloud:9094,broker3.hosting.cegedim.cloud:9094 
```

{% endcode %}

<pre class="language-bash" data-overflow="wrap"><code class="lang-bash"><strong>kafka-topics.sh --bootstrap-server $BROKERS --create --replication-factor 3 --partitions 3 --topic my-topic --command-config client.properties 
</strong>Created topic my-topic.
</code></pre>

#### List topics <a href="#apachekafkahowtos-listtopics" id="apachekafkahowtos-listtopics"></a>

<pre data-overflow="wrap"><code><strong>kafka-topics.sh --bootstrap-server $BROKERS -list --command-config client.properties my-topic
</strong></code></pre>

#### Describe a topic <a href="#apachekafkahowtos-describeatopic" id="apachekafkahowtos-describeatopic"></a>

<pre data-overflow="wrap"><code><strong>kafka-topics.sh --bootstrap-server $BROKERS --describe --topic my-topic --command-config client.properties 
</strong>Topic: my-topic	      TopicId: 84yqCErzTG27J4wv44dkPQ	PartitionCount: 4	ReplicationFactor: 3	Configs: cleanup.policy=delete
Topic: my-topicc	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
Topic: my_topic		Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
Topic: my_topic		Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
Topic: my_topic		Partition: 3	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
</code></pre>

#### Delete a topic <a href="#apachekafkahowtos-deleteatopic" id="apachekafkahowtos-deleteatopic"></a>

<pre data-overflow="wrap"><code><strong>kafka-topics.sh --bootstrap-server $BROKERS --delete --topic my-topic --command-config client.properties
</strong></code></pre>

#### Add partitions to topic <a href="#apachekafkahowtos-addpartitionstotopic" id="apachekafkahowtos-addpartitionstotopic"></a>

{% code overflow="wrap" %}

```
kafka-topics.sh --bootstrap-server $BROKERS --alter --topic my-topic --partitions 16 --command-config client.properties
```

{% endcode %}

#### List under-replicated partitions for all topics <a href="#apachekafkahowtos-listunder-replicatedpartitionsforalltopics" id="apachekafkahowtos-listunder-replicatedpartitionsforalltopics"></a>

{% code overflow="wrap" %}

```
kafka-topics.sh --bootstrap-server $BROKERS --describe --under-replicated-partitions --command-config client.properties
```

{% endcode %}

#### List ACLs for a topic <a href="#apachekafkahowtos-listaclsforatopic" id="apachekafkahowtos-listaclsforatopic"></a>

{% code overflow="wrap" %}

```
kafka-acls.sh --bootstrap-server $BROKERS --topic=my-topic --list --command-config client.properties
```

{% endcode %}

### Manage users <a href="#apachekafkahowtos-manageusers" id="apachekafkahowtos-manageusers"></a>

#### Create a Kafka user <a href="#apachekafkahowtos-createakafkauser" id="apachekafkahowtos-createakafkauser"></a>

{% code overflow="wrap" %}

```
kafka-configs.sh --bootstrap-server $BROKERS --alter --add-config 'SCRAM-SHA-256=[password=secret]' --entity-type users --entity-name username --command-config client.properties
```

{% endcode %}

#### List Kafka users <a href="#apachekafkahowtos-listkafkausers" id="apachekafkahowtos-listkafkausers"></a>

{% code overflow="wrap" %}

```
kafka-configs.sh --bootstrap-server $BROKERS --describe --entity-type users --command-config client.properties
```

{% endcode %}

#### Delete a Kafka user <a href="#apachekafkahowtos-deleteakafkauser" id="apachekafkahowtos-deleteakafkauser"></a>

{% code overflow="wrap" %}

```
kafka-configs.sh --bootstrap-server $BROKERS --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name username --command-config client.properties
```

{% endcode %}

#### List all ACLs <a href="#apachekafkahowtos-listallacls" id="apachekafkahowtos-listallacls"></a>

{% code overflow="wrap" %}

```
kafka-acls.sh --bootstrap-server $BROKERS --list --command-config client.properties
```

{% endcode %}

#### List ACL for Kafka user <a href="#apachekafkahowtos-listaclforkafkauser" id="apachekafkahowtos-listaclforkafkauser"></a>

{% code overflow="wrap" %}

```
kafka-acls.sh --bootstrap-server $BROKERS --principal User:admin --list --command-config client.properties
```

{% endcode %}

#### Set ACL for Kafka user <a href="#apachekafkahowtos-setaclforkafkauser" id="apachekafkahowtos-setaclforkafkauser"></a>

{% code overflow="wrap" %}

```
kafka-acls.sh --bootstrap-server $BROKERS --add --allow-principal User:alice --producer --topic my-topic --command-config client.properties
```

{% endcode %}

#### Remove ACL for Kafka user <a href="#apachekafkahowtos-removeaclforkafkauser" id="apachekafkahowtos-removeaclforkafkauser"></a>

{% code overflow="wrap" %}

```
kafka-acls.sh --bootstrap-server $BROKERS --remove --allow-principal User:bob --consumer --topic my-topic --group my-consumer-group --command-config client.properties
```

{% endcode %}

### Produce <a href="#apachekafkahowtos-produce" id="apachekafkahowtos-produce"></a>

#### Start a producer <a href="#apachekafkahowtos-startaproducer" id="apachekafkahowtos-startaproducer"></a>

{% code overflow="wrap" %}

```
kafka-console-producer.sh --broker-list $BROKERS --topic my-topic --producer.config client.properties 
>
```

{% endcode %}

### Consume <a href="#apachekafkahowtos-consume" id="apachekafkahowtos-consume"></a>

#### Start a consumer <a href="#apachekafkahowtos-startaconsumer" id="apachekafkahowtos-startaconsumer"></a>

{% code overflow="wrap" %}

```
kafka-console-consumer.sh --bootstrap-server $BROKERS --topic my-topic --consumer.config client.properties --group consu
```

{% endcode %}

#### List all consumer groups <a href="#apachekafkahowtos-listallconsumergroups" id="apachekafkahowtos-listallconsumergroups"></a>

{% code overflow="wrap" %}

```
kafka-consumer-groups.sh --list --bootstrap-server $BROKERS --command-config client.properties
```

{% endcode %}

#### Describe consumer group <a href="#apachekafkahowtos-describeconsumergroup" id="apachekafkahowtos-describeconsumergroup"></a>

{% code overflow="wrap" %}

```
kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server $BROKERS --describe --group consu --command-config client.properties
```

{% endcode %}

#### Delete a consumer group <a href="#apachekafkahowtos-deleteaconsumergroup" id="apachekafkahowtos-deleteaconsumergroup"></a>

{% code overflow="wrap" %}

```
kafka-consumer-groups.sh --bootstrap-server $BROKERS --group my-group --group my-other-group --delete --command-config client.properties
```

{% endcode %}

## Kcat <a href="#apachekafkahowtos-kafkacat" id="apachekafkahowtos-kafkacat"></a>

kafkacat is a generic non-JVM producer and consumer for Apache Kafka >=0.8.

{% embed url="<https://github.com/edenhill/kafkacat>" %}

Version 1.5.0 and above must be used to support SASL\_SSL authentication.

More information regarding Kafkacat is available on Confluent website :

{% embed url="<https://docs.confluent.io/platform/current/app-development/kafkacat-usage.html>" %}

## Kafka clients <a href="#apachekafkahowtos-kafkaclients" id="apachekafkahowtos-kafkaclients"></a>

Please refer to this documentation to create a Kafka client in any language you need :

{% embed url="<https://docs.confluent.io/home/clients/overview.html>" %}

## Kafka connectors <a href="#apachekafkahowtos-kafkaconnectors" id="apachekafkahowtos-kafkaconnectors"></a>

Please refer to this documentation to know more about Kafka connectors :

{% embed url="<https://docs.confluent.io/home/connect/overview.html>" %}

## Kafka add broker node <a href="#apachekafkahowtos-kafkaaddbrokernode" id="apachekafkahowtos-kafkaaddbrokernode"></a>

At the top of the cluster page, click on the **Manage** button, then on **Add broker node**. Select the disk's size and the availability zone.

Enter an RFC number for tracking (optional), then click **Submit**.

{% hint style="info" %}
This feature is only available for Kafka 3.6 or later. Only one node can be added at a time. Node's name, RAM and CPU are predefined for cluster consistency.
{% endhint %}

An e-mail notification will be sent when the node have been created.
