```xml
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>${activemq.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>${activemq.version}</version>
</dependency>
```

### 1、描述

原始状态的 activemq 集成非常方便，也更适合定制。有些同学，可能对原始接口会比较陌生，会希望有个具体的示例。

完整的集成代码参考：

https://gitee.com/opensolon/solon-examples/tree/main/b.Solon-Integration/demoB004-activemq

希望更加简化使用的同学，可以使用：

[activemq-solon-cloud-plugin](/article/755)  （使用更简单，定制性弱些）


### 2、配置项示例

添加 yml 配置。并约定（也可按需定义）：

* "solon.activemq"，作为配置前缀
* "properties"，作为公共配置
* "producer"，作为生态者专属配置（估计用不到）
* "consumer"，作为消费者专属配置（估计用不到）

具体的配置属性，参考自：ActiveMQConnectionFactory


```yaml
solon.app:
  name: "demo-app"
  group: "demo"

# 配置可以自由定义，与 @Bean 代码对应起来即可（以下为参考）
solon.activemq:
  properties:  #公共配置（配置项，参考：ActiveMQConnectionFactory）
    brokerURL: "failover:tcp://localhost:61616"
    redeliveryPolicy:
      initialRedeliveryDelay: 5000
      backOffMultiplier: 2
      useExponentialBackOff: true
      maximumRedeliveries: -1
      maximumRedeliveryDelay: 3600_000
```

添加 java 配置器

```java
@Configuration
public class ActivemqConfig {
    @Bean(destroyMethod = "stop")
    public Connection client(@Inject("${solon.activemq.properties}") Props common) throws Exception {
        String brokerURL = (String) common.remove("brokerURL");
        String userName = (String) common.remove("userName");
        String password = (String) common.remove("password");

        ActiveMQConnectionFactory factory;
        if (Utils.isEmpty(userName)) {
            factory = new ActiveMQConnectionFactory(brokerURL);
        } else {
            factory = new ActiveMQConnectionFactory(brokerURL, userName, password);
        }

        //绑定额外的配置并创建连接
        Connection connection = common.bindTo(factory).createConnection();
        connection.start();
        return connection;
    }

    @Bean
    public IProducer producer(Connection connection) throws Exception {
        return new IProducer(connection);
    }

    @Bean
    public void consumer(Connection connection,
                         MessageListener messageListener) throws Exception {
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createTopic("topic.test");
        MessageConsumer consumer = session.createConsumer(destination);

        consumer.setMessageListener(messageListener);
    }
}
```

activemq 的消息发送的代码比较复杂，所以我们再做个包装处理（在上面的配置时构建）：

```java
public class IProducer {
    private Connection connection;

    public IProducer(Connection connection) {
        this.connection = connection;
    }

    public void send(String topic, MessageBuilder messageBuilder) throws JMSException {
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

        Destination destination = session.createTopic(topic);
        MessageProducer producer = session.createProducer(destination);

        producer.send(destination, messageBuilder.build(session));
    }

    @FunctionalInterface
    public static interface MessageBuilder {
        Message build(Session session) throws JMSException;
    }
}
```


### 3、代码应用

发送（或生产），这里代控制器由用户请求再发送消息（仅供参考）：

```java
@Controller
public class DemoController {
    @Inject
    private IProducer producer;

    @Mapping("/send")
    public void send(String msg) throws Exception {
        //发送
        producer.send("topic.test", s -> s.createTextMessage("test"));
    }
}
```

监听（或消费），这里采用订阅回调的方式：（仅供参考）

```java
@Component
public class DemoMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(message);
        RunUtil.runAndTry(message::acknowledge);
    }
}
```

