Java:JMS(Java Message Service)详解

Java消息服务Java Message ServiceJMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS的组成

JMS 体系结构分两大块,分别是 JMS 提供者 和 JMS 客户。JMS 提供者实现了 JMS 规范,提供了 JMS 语义支持。

JSM 提供者(Provider):JMS 队列,JMS 主题,JMS 消息。

JMS 客户端:JMS 生产者,JMS 消费者。

JMS应用流程

  1. 客户端使用连接工厂(ConnectionFactory)连接 JMS 服务提供者

  2. 使用连接工厂创建连接(Connection)

  3. 使用连接创建会话(Session)

  4. 使用会话创建出生产者(Producer)和消费者(Consumer),并设置目的地(Destinations),标明消息发往何处和消费的消息来源。

    有两种目的地:队列(Queue)和主题(Topic)。

  5. 使用监听器(MessageListener)接收消息,内部有一个 onMessage 方法,就是在这个方法里处理收到消息之后的业务逻辑。

JMS消息结构

JMS 规范规定了消息包括:消息头,消息属性,消息体三部分。

  • 消息头:指定消息的JMS属性,包括目标,持久与否,有效期及优先级。这些属性控制消息如何传送。
  • 消息属性:可选项,可看作消息头的补充,可以自定义消息属性,例如自定义使用属性过滤消息。
  • 消息体:传输的实际数据。JMS API 定义了五种类型的消息格式,抽象接口是 javax.jms.Message,下面五种类型消息继续消息抽象接口。
    • javax.jms.BytesMessage 字节数据
    • javax.jms.MapMessage 键值对
    • javax.jms.ObjectMessage Java 对象
    • javax.jms.StreamMessage 流式消息
    • javax.jms.TextMessage 文本消息

JMS消息模型

Java消息服务应用程序结构支持两种模型:

  • 点对点或队列模型
  • 发布/订阅模型

点对点

一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:

  • 只有一个消费者将获得消息。
  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
  • 每一个成功处理的消息都由接收者签收,服务器收到签收成功则从队列中删除该条消息。

每条消息仅会被一个消费者消费,消息一旦被消费则会从队列中删除。可能会有多个消费者在监听同一个队列,但队列中的消息只会被一个消费者所消费,也就不存在重复消费的问题。

该模型的消息存在先后顺序,依据的是队列的先进先出(FIFO)特性。消息服务器会按照消息存入队列的顺序,把它们传递给消费者。

点对点 的JMS-API 接口有:

  • javax.jms.QueueConnectionFactory
  • javax.jms.QueueConnection
  • javax.jms.QueueSession
  • javax.jms.Queue
  • javax.jms.QueueSender
  • javax.jms.QueueReceiver

发布/订阅

发布者/订阅者模型支持向一个特定的消息主题发布消息。一个或多个订阅者可以订阅自己感兴趣的主题。

在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:

  • 多个消费者可以获得消息。

  • 在发布者和订阅者之间存在时间依赖性。

    发布者需要创建一个订阅(subscription),以便客户能够订阅。

    订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

发布/订阅模式下,按照消费类型又可分为 集群消费广播消费

发布/订阅 的JMS-API 接口有:

  • javax.jms.TopicConnectionFactory
  • javax.jms.TopicConnection
  • javax.jms.TopicSession
  • javax.jms.Topic
  • javax.jms.TopicPublisher
  • javax.jms.TopicSubscriber

注意:RabbitMQ 和 Kafka 并没有遵循 JMS 规范,但 JMS 是比较早提出消息规范的,后来的消息中间件产品大都借签了它的语议。

MQ基本概念

生产者/消费者

生产者-消费者:指一方生产数据,一方消费数据。在两者之前存在一个缓冲区(一般使用队列),生产者向缓冲区增加数据,消费者从缓冲区消费数据,重复该过程。

观察者模式与发布/订阅

观察者模式与发布/订阅 看着类似,但有本质不同,主要体现在解耦特性上。

  • 观察者模式:观察者 Observer 需要提前注册到主题 Subject 上,当这个主题每发生一件事都会向观察者发送通知。观察者模式在 空间和时间上都是耦合的。通信一般是同步的。
  • MQ发布/订阅:发布/订问模式下多了一个队列,在空间和时间上都是解耦的。通信是异步的,通过队列来实现。

MQ应用场景

消息队列(MQ)是一种不同应用程序之间(跨进程)的通信方法。应用程序通过写入和读取队列的数据(消息)来进行通信,而不是通过彼此之间直接调用(RPC)来进行通信。

异步处理

异步处理是 MQ 天然自带的一特性。使用异步处理可以缩短主流程的响应时间,提升用户体验,提高系统效率。

异步处理特别使用于由某一主流程引起的辅助或扩展流程,两者不是强关联关系,扩展流程的执行不影响主流程。

例如,消息推送,主业务完成了,需要发送多种渠道的消息(短信,公众号,邮件等),则可以用异步来并行处理。

应用解耦

MQ 最直接的使用场景就是可以将两个系统进行解耦。例如订单库存业务,订单系统下单完成后立即返回给用户。可以发送消息到 MQ 由库存系统去做减库存的业务处理。

空间解耦

生产者和消费者相互不需要知道对方的存在。即生产者和消费不再相互依赖,实现了空间解耦。

空间耦合:与一个或给定的接收者直接通信;接收者必须在那个时刻存在。例如,RPC调用。

时间解耦

生产者将数据放入队列后,不需要关心消费者什么时候消费(消费者可以实时消费,也可以根据自己的业务逻辑判断时间去消费),这样就实现了时间解耦。

生产者和消费者有各自的生命周期。

时间耦合 时间解耦
**空间耦合 ** 特征:与一个或一些给定的接收者直接通信;
接收者必须在那个时刻存在。
例子:远程过程调用(RPC)
特征:与一个或一些给定的接收者直接通信,
发送者和接收者可以有自己的生命周期。
例子:苹果推送,Android厂商推送
空间解耦 特征:发送者不需要知道接收者的身份;
接收者必须在那个时刻存在。
例子:代理,网关
特征:发送者不需要知道接收者的身份,
发送者和接收者都可以有自己的生命周期。
例子:发布/订阅系统,消息队列

流量削峰

削峰的本质是利用 MQ 的队列特性,对业务进行排队。

当出现高并发时,MQ按照队列的特性,一个一个入队,一个一个消费。这样可以避免高并发压垮系统的关键组件,如某个核心服务或数据库等。

消费者端可以通过拉取(PULL)的方式来消费数据,并且拉取速度由消费端控制,则可以控制流量趋于平稳。就达到了削峰的目的,或者说起到了流控的作用。

拉取模式由消费端控制,调用一次就拉取一次消息进行消费(可能是一条或多条消息)。这里需要重视消费速度,如果消费性能下降,则会造成消息积压,因此消费端可以自己启用多线程控制并行度以提高消费速度。

拉取模式与监听模式

  • 监听模式:由 MQ 客户端守护进程去不停地拉取消息进行消费。
  • 拉取模式:由用户控制拉取频率,不主动调用就不会消费消息。

最终一致性

在分布式环境中,通常要求实现最终一致性。

例如,订单支付与库存系统,订单支付成功后,会涉及两个操作,一是立即更新订单状态为支付成功,二是发送成功通知到 MQ,可以把这两个操作放在同一个本地事务,要么成功,要么失败。当一次发送 MQ 失败之后,可以结合定时任务进行补偿,这样可以保证订单的结果落地到 MQ 的存储中。

同样库存系统消费端依靠 MQ 重试机制一直触发消息,直到消费端最终确认库存系统减库存民功处理完成。通过消息落地加补偿,消费端业务上面考虑重复消费的保障,也就是做好幂等性操作,利用 MQ 实现了最终一致性。

MQ消费幂等

幂等操作:同一操作发起的一次请求或者多次请求的结果是一致的。

通常读操多都是幂等操作,但要特别注册 增加 和 修改 操作,一定要注意幂等性。特别是涉及支付,交易等业务的时候。

RabbitMQ,RocketMQ,Kafka 都有可能会出现重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,需要开发者在业务逻辑处理时保证。

大多数消息中间件产品都要保证消息一定投递成功,为了不丢失消息宁可让消息重复。所以即要确保消息投递成功,又要确保不重复投递是非常困难的。

特别是在分布式环境下,最大的不稳定因素是网络因素,网络因素造成消息重复消费的原因有如下两种情况:

消息重复发布

生产者发送消息到 MQ,MQ接收到消息后返回响应给生产者时网络闪断,那么生产者会认为消息发送失败,会尝试第二次发送,就造成 MQ 服务端有两份相同的消息。

RabbitMQ 为解决此问题提供了两种方式:一是基于 AMQP 协议中的事务机制;二是把信道设置成确认模式。

  • 事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。
  • 发布者确认机制模仿的是 AMQP 0-9-1 标准中的消费者确认,其最大优势是确认是异步的,在等待返回消息确认之前仍可继续发送消息,相比事务机制,吞吐量大大提高。

消息重复消费

消费者第一次正确消费消息后,通常需要响应 ACK 以告诉 MQ 服务端,若在响应时网络闪断,为了确保消息投递成功,MQ 服务器会尝试第二次投递(这里还要看是 PUSH 模式还是 PULL 模式)。

消费到重复的消息需要在消费者端的业务逻辑保证幂等性。例如对每条业务消息增加业务全局唯一标识(ticket)等,判断是消费过的就丢弃。

AMQP 提供的是 至少一次交付(at-least-once delivery),异常情况下消息会被重复消费,此里需要业务实现幂等性。

Kafka 的每个消费者实例都会为它消费的分区维护属于自己的位置位息来记录当前消费了多少条消息,被称为位移(offset)。消费者需要定期向 Kafka 集群汇报自己的消费进度,这一过程被称为位移提交(offset commit)。万一消费者重启,可以继续从上次消费到的 offset 来继续消费。

幂等操作保证

  1. 利用数据库唯一性约束实现幂等。例如,使用主键,或唯一索引列。

  2. 更新操作前增加前置判断条件,若条件成立则更新,否则就不更新。

    该方法利用的是数据库的 update where 条件。

  3. 记录并检查记录。例如,给消息设置唯一ID,记录已消费的ID,消费时查检查是否已被消费过。

    推荐此方式,而不是把幂等保证若到持久层。

MQ广播消费

发布订阅/模式按消费类型又可分为 集群消费广播消费

集群消费

集群消费:MQ 发送每一条消息,消费者集群中只有一台服务器可以消费到这条消息。

广播消费

广播消费:MQ 发送每一条消息,集群中的每一台服务器至少消费到一次。

示例:消息推送系统。首先客户端与消息中心应用集群建立长连接。消息中心集群中的每一台服务器都可以消费到业务消息,消息中心实例将消息推送给长连接 Session 存在的客户端。

注意:业务方需要关注消费失败,MQ 可以确保每条消息至少被每台消费方服务器消费一次,如果消费方失败,则不会重试。

广播消费有个弊端,不支持顺序消息,因为消费进度在客户端维护。可以 利用集群消费来模拟广播消费,把集群消费中每台服务器上相同的消费 appId 改为不同即可。

MQ控制开始

在监听器模式下,比如新上线服务,并不想服务一启动就开始消费数据,可以设置手动启动消费。

以 RocketMQ 为例,RocketMQ 为消费者提供了开始和暂停方法,如下:

1
2
3
4
5
6
7
public void suspend() {
this.defaultMQPushConsumerImpl.suspend();
}

public void resume() {
this.defaultMQPushConsumerImpl.resume();
}

MQ消息过滤

通常消息中间件会提供消息过滤功能,比如 RocketMQ 等。消息过滤是通过在消息体之个增加 Tag 来实现的。通常用于消息的业务数据有多个状态(多个阶段的生命周期),需要由对应的业务来处理。

不同业务的订阅者都订阅同一个主题,但又关注的是消息业务数据的不同阶段。此时在消息中间件层就进行了过滤,就不需要在消费者端解析消息体过滤,从而达到提高消息性能的目的。

用 Tag 可以区分同一个 Topic 下相互关联的消息。也可以用不同的 Topic,但不同的 Topic 之间是没有必然联系的。

以 RocketMQ 为例:

1
2
3
4
5
Message(String topic, String tags, byte[] body);

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");

MQ消息过期

两种情况:

  • 应该被消费的消息在过期时间内没被消费,通常是消费者异常导致消息积压最终过期。

    创建死信队列,过期消息由侦听死信队列的服务处理。

  • 消费者端业务调整,以前创建的主题或队列废弃掉了不再使用,但生产者仍在发送消息。

    通常创建一个任务,将消息数据迁移到另外的存储中,如 HBase 集群。判断过期,例如一天内没有消费者连接,且消息积压超过5天等。

MQ消息重试

MQ 中间件大都可以配置消息重试的策略。若要主动触发消息重试,在代码中直接抛出 RuntimeException 异常即可,每个 MQ 会根据自己的机制执行重试策略。

要注意重试的幂等性操作,发生了异常导致的重试,应当及时解决这种消费异常,从而保证消费性能,防止MQ服务端消息积压。特点要注意的是,不能用这种重试机制做业务逻辑上的处理。

如果关闭掉重试,且业务消费时抛出了异常,会导致消息消费异常,会表现为一直重复消费某条或某些信息,就像会消费卡住了。一般不建议关闭重试机构。

如果业务能够接受在消费异常时丢弃该消息,可使用 catch 捕获 Exception 不再抛出。

生产者消费者问题

摘自维基百科:生产者消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(Bounded-buffer problem),是一个多进程同步问题的经典案例。

该问题描述了共享固定大小缓冲区的两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据消费者也不会在缓冲区中空时消耗数据

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

通常采用进程间通信的方法解决该问题,常用的方法有信号灯法[1]等。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

实现

不完善的实现

下面这个解决方法会导致竞争条件。如果程序员不够小心,那么他就有可能写出下面这种算法。

该算法使用了两个系统库函数,sleepwakeup。调用 sleep 的进程会被阻断,直到有另一个进程用 wakeup 唤醒之。代码中的 itemCount 用于记录缓冲区中的数据项数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int itemCount = 0;

procedure producer() {
while (true) {
item = produceItem();
if (itemCount == BUFFER_SIZE) {
sleep();
}
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1) {
wakeup(consumer);
}
}
}

procedure consumer() {
while (true) {
if (itemCount == 0) {
sleep();
}
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1) {
wakeup(producer);
}
consumeItem(item);
}
}

上面代码中的问题在于它可能导致竞争条件,进而引发死锁。考虑下面的情形:

  1. 消费者把最后一个 itemCount 的内容读出来,注意它现在是零。消费者返回到while的起始处,现在进入 if 块;
  2. 就在调用sleep之前,CPU决定将时间让给生产者,于是消费者在执行 sleep 之前就被中断了,生产者开始执行;
  3. 生产者生产出一项数据后将其放入缓冲区,然后在 itemCount 上加 1;
  4. 由于缓冲区在上一步加 1 之前为空,生产者尝试唤醒消费者;
  5. 遗憾的是,消费者并没有在休眠,唤醒指令不起作用。当消费者恢复执行的时候,执行 sleep,一觉不醒。出现这种情况的原因在于,消费者只能被生产者在 itemCount 为 1 的情况下唤醒;
  6. 生产者不停地循环执行,直到缓冲区满,随后进入休眠。

由于两个进程都进入了永远的休眠,死锁情况出现了。因此,该算法是不完善的。

使用信号灯的算法

信号灯可以避免上述唤醒指令不起作用的情况。

该方法(见下面的代码)使用了两个信号灯,fillCountemptyCount。fillCount 用于记录缓冲区中将被读取的数据项数(实际上就是有多少数据项在缓冲区里),emptyCount 用于记录缓冲区中空闲空间数。

当有新数据项被放入缓冲区时,fillCount 增加,emptyCount 减少。如果在生产者尝试减少 emptyCount 的时候发现其值为零,那么生产者就进入休眠。等到有数据项被消耗,emptyCount 增加的时候,生产者才被唤醒。消费者的行为类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
semaphore fillCount = 0; // 生产的项目
semaphore emptyCount = BUFFER_SIZE; // 剩余空间

procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
putItemIntoBuffer(item);
up(fillCount);
}
}

procedure consumer() {
while (true) {
down(fillCount);
item = removeItemFromBuffer();
up(emptyCount);
consumeItem(item);
}
}

上述方法在只有一个生产者和一个消费者时能解决问题。对于多个生产者或者多个消费者共享缓冲区的情况,该算法也会导致竞争条件,出现两个或以上的进程同时读或写同一个缓冲区槽的情况。为了说明这种情况是如何发生的,可以假设 putItemIntoBuffer() 的一种可能的实现:先寻找下一个可用空槽,然后写入数据项。下列情形是可能出现的:

  1. 两个生产者都减少 emptyCount 的值;
  2. 某一生产者寻找到下一个可用空槽;
  3. 另一生产者也找到了下一个可用空槽,结果和上一步被找到的是同一个空槽;
  4. 两个生产者向可用空槽写入数据。

为了解决这个问题,需要在保证同一时刻只有一个生产者能够执行 putItemIntoBuffer()。也就是说,需要寻找一种方法来互斥地执行临界区的代码。为了达到这个目的,可引入一个二值信号灯 mutex,其值只能为 1 或者 0。如果把线程放入 down(mutex) 和 up(mutex) 之间,就可以限制只有一个线程能被执行。多生产者、消费者的解决算法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;

procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
down(mutex);
putItemIntoBuffer(item);
up(mutex);
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
down(mutex);
item = removeItemFromBuffer();
up(mutex);
up(emptyCount);
consumeItem(item);
}
}

使用管程的算法

下列伪代码展示的是使用管程来解决生产者消费者问题的办法。由于管程一定能保证互斥,不必特地考虑保护临界区[2]。也就是说,下面这个方法不用修改就可以推广适用于任意数量的生产者和消费者的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
monitor ProducerConsumer {
int itemCount;
condition full;
condition empty;

procedure add(item) {
while (itemCount == BUFFER_SIZE)
wait(full);
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
notify(empty);
}

procedure remove() {
while (itemCount == 0)
wait(empty);
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
notify(full);
return item;
}
}

procedure producer() {
while (true) {
item = produceItem()
ProducerConsumer.add(item)
}
}

procedure consumer() {
while (true) {
item = ProducerConsumer.remove()
consumeItem(item)
}
}

注意代码中 while 语句的用法,都是用在测试缓冲区是否已满或空的时候。当存在多个消费者时,有可能造成竞争条件的情况是:某一消费者在一项数据被放入缓冲区中时被唤醒,但是另一消费者已经在管程上等待了一段时间并移除了这项数据。如果 while 语句被改成 if,则会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。

不使用信号灯或者管程

对于生产者消费者问题来说,特别是当只有一个生产者和一个消费者时,实现一个先进先出结构或者通信通道非常重要。这样,生产者-消费者模式就可以在不依赖信号灯、互斥变量或管程的的情况下高效地传输数据。但如果采用这种模式,性能可能下降,因为实现这种模式的代价比较高。人们喜欢用先进先出结构或者通信通道,只是因为可以避免端与端之间的原子性同步。用 C 语言举例如下,请注意:

  1. 该例绕开了对共享变量的原子性“读-改-写”访问:每个 Count 变量都由单进程更新;
  2. 该例并不使进程休眠,这种做法依据系统不同是合理的。方法 sched_yield()只是为了看起来舒服点。完全可以去掉(注意:它后面的分号是不能去掉的)。 进程库通常会要求信号灯或者条件变量控制进程的休眠和唤起,在多处理器环境中,进程的休眠和唤起发生的频率比传递数据符号要小,因此避开对数据原子性操作是有利的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
volatile unsigned int produceCount, consumeCount;
TokenType buffer[BUFFER_SIZE];

void producer(void) {
while (1) {
while (produceCount - consumeCount == BUFFER_SIZE)
sched_yield(); // 缓冲区满
buffer[produceCount % BUFFER_SIZE] = produceToken();
produceCount += 1;
}
}

void consumer(void) {
while (1) {
while (produceCount - consumeCount == 0)
sched_yield(); // 缓冲区空
consumeToken( buffer[consumeCount % BUFFER_SIZE]);
consumeCount += 1;
}
}

Java 中的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;

/**
1个生产者 3个消费者 生产、消费10次

@作者 pt

*/

public class ProducerConsumer {
Stack<Integer> items = new Stack<Integer>();
final static int NO_ITEMS = 10;

public static void main(String args[]) {
ProducerConsumer pc = new ProducerConsumer();
Thread t1 = new Thread(pc.new Producer());
Consumer consumer = pc.new Consumer();
Thread t2 = new Thread(consumer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);
t1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
t2.start();
t3.start();
t4.start();
try {
t2.join();
t3.join();
t4.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

class Producer implements Runnable {
public void produce(int i) {
System.out.println("Producing " + i);
items.push(new Integer(i));
}

@Override
public void run() {
int i = 0;
// 生产10次
while (i++ < NO_ITEMS) {
synchronized (items) {
produce(i);
items.notifyAll();
}
try {
// 休眠一段时间
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
}

class Consumer implements Runnable {
// consumed计数器允许线程停止
AtomicInteger consumed = new AtomicInteger();

public void consume() {
if (!items.isEmpty()) {
System.out.println("Consuming " + items.pop());
consumed.incrementAndGet();
}
}

private boolean theEnd() {
return consumed.get() >= NO_ITEMS;
}

@Override
public void run() {
while (!theEnd()) {
synchronized (items) {
while (items.isEmpty() && (!theEnd())) {
try {
items.wait(10);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
consume();
}
}
}
}
}

Java:JMS(Java Message Service)详解

http://blog.gxitsky.com/2022/01/17/Java-JMS-01-api-detail/

作者

光星

发布于

2022-01-17

更新于

2022-06-17

许可协议

评论