```xml
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>
```

### 1、描述

原始状态的 kafka 集成非常方便，也更适合定制。有些同学，可能对原始接口会比较陌生，会希望有个具体的示例。

完整的集成代码参考：

https://gitee.com/opensolon/solon-examples/tree/main/b.Solon-Integration/demoB001-kafka

希望更加简化使用的同学，可以使用：

[kafka-solon-cloud-plugin](/article/157)  （使用更简单，定制性弱些）


### 2、配置项示例


添加 yml 配置。并约定（也可按需定义）：

* "solon.kafka"，作为配置前缀
* "properties"，作为公共配置
* "producer"，作为生态者专属配置
* "consumer"，作为消费者专属配置

具体的配置属性，参考自：ProducerConfig，ConsumerConfig


```yaml
solon.app:
  name: "demo-app"
  group: "demo"

# 配置前缀，可以自由定义，与 @Bean 代码对应起来即可（以下为参考）
solon.kafka:
  properties:  #公共配置（配置项，参考：ProducerConfig，ConsumerConfig 的公用部分）
    bootstrap:
      servers: "127.0.0.1:9092"
    key:
      serializer: "org.apache.kafka.common.serialization.StringSerializer"
      deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
    value:
      serializer: "org.apache.kafka.common.serialization.StringSerializer"
      deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
  producer: #生产者专属配置（配置项，参考：ProducerConfig）
    acks: "all"
  consumer: #消费者专属配置（配置项，参考：ConsumerConfig）
    enable:
      auto:
        commit: "false"
    isolation:
      level: "read_committed"
    group:
      id: "${solon.app.group}:${solon.app.name}"
```

添加 java 配置器

```java
@Configuration
public class KafkaConfig {
    @Bean
    public KafkaProducer<String, String> producer(@Inject("${solon.kafka.properties}") Properties common,
                                             @Inject("${solon.kafka.producer}") Properties producer) {

        Properties props = new Properties();
        props.putAll(common);
        props.putAll(producer);

        return new KafkaProducer<>(props);
    }

    @Bean
    public KafkaConsumer<String, String> consumer(@Inject("${solon.kafka.properties}") Properties common,
                                                  @Inject("${solon.kafka.consumer}") Properties consumer) {
        Properties props = new Properties();
        props.putAll(common);
        props.putAll(consumer);

        return new KafkaConsumer<>(props);
    }
}
```




### 3、代码应用

发送（或生产），这里代控制器由用户请求再发送消息（仅供参考）：

```java
@Controller
public class DemoController {
    @Inject
    private KafkaProducer<String, String> producer;

    @Mapping("/send")
    public void send(String msg) {
        //发送
        producer.send(new ProducerRecord<>("topic.test", msg));
    }
}
```

拉取（或消费），这里采用定时拦取方式：（仅供参考）

```java
@Component
public class DemoJob {
    @Inject
    private KafkaConsumer<String, String> consumer;

    @Init
    public void init() {
        //订阅
        consumer.subscribe(Arrays.asList("topic.test"));
    }

    @Scheduled(fixedDelay = 10_000L, initialDelay = 10_000L)
    public void job() throws Exception {
        //拉取
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            //确认
            consumer.commitSync();
        }
    }
}
```

