4.2 Transport connectors

——ActiveMQ连接

2016-04-28 23:44:51.0

4.2 传输连接器

为了交换消息,生产者和消息者之间需要连接到服务端。client-to-broker,客户端到服务端的通讯是通过传输连接器实现的(transport connectors)。ActiveMQ提供了客户端可以使用的连接到服务端的协议列表。ActiveMQ用户的需求是各大不相同的。有些用户关注于性能,其他人可能关注安全性。ActiveMQ去覆盖所有方面,并且为每个用例都提供了一种连接器。

在这部分中,你会学到传输连接器在ActiveMQ配置文件中如何配置,并改造股票投资系统去演示不同的连接器。下面的部分,我们经历经由网络连接到服务端的可用协议。也会介绍一下嵌入式服务端和用以在应用内部的服务端之间交互的虚拟机协议(第7章讲到)。

4.2.1 配置传输协议

    从服务端角度来看,传输协议就是一个接收并监听客户端消息的机制。如果你仔细看一下ActiveMQ的demo的配置(examples/conf/activemq-demo.xml),你会发现下面示例中的配置传输协议的片段:

<transportConnectors>
	<!-- Create a TCP transport that is advertised on via an IP multicast
	  group named default. -->
	<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
	<!-- Create a SSL transport. Make sure to configure the SSL options
	  via the system properties or the sslContext element. -->
	<transportConnector name="ssl" uri="ssl://localhost:61617"/>
	<!-- Create a STOMP transport for STOMP clients. -->
	<transportConnector name="stomp" uri="stomp://localhost:61613"/>
	<!-- Create a Websocket transport for the websocket dmeo -->
	<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
    正如你所看到的,传输连接器定义在<transportConnectors>节点中。你定义特定的连接器使用<transportConnector>元素开始。ActiveMQ可以通过监听不同的端口同时支持多种协议。连接器的配置必须定义唯一的名称和uri属性。在这个例子中,uri定义了网络协议和可选参数,通过这些配置,ActiveMQ就可以对外可连接了。openware协议中的disconveryUri属性是可选的,会在第4.3.1部分讲到
前面的片段定义了传输连接器。用这个配置文件启动
ActiveMQ(bin\activemq.bat start xbean:examples/conf/activemq-demo.xml
    你会看到控制台中出现下面的信息

INFO | Listening for connections at: tcp://127.0.0.1:61616
INFO | Connector openwire started
INFO | Listening for connections at: ssl://127.0.0.1:61617
INFO | Connector ssl started
INFO | Listening for connections at: stomp://127.0.0.1:61613
INFO | Connector stomp started
    从客户端来看,传输协议uri就是为了向服务端发送和接收消息创建的连接。发送和接收消息将在第7章讲到。但下面的代码片段应该足够演示传输协议uri在java应用中的使用:

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    注意:上面的示例中,定义在ActiveMQ中的传输连接器uri被客户端用以创建到服务端的连接。在这个情况下,uri是tcp协议使用的,黑色字体标明的。
    注意,必须知道的重要的一点是我们即可以服务端也可以在客户端的uri中使用query部分配置连接参数。通过大多数的参数可以应用于客户端和服务端。但有一些参数只能用于其中的一端。所以在使用特定的查询参数之前要检查下协议说明书。
    对配置传输连接器有了基本的理解后,知道和理解ActiveMQ中的可用的传输连接器是重点。但在我们开始讲解特定的连接器之前,我们必须首选改造之前的股票投资系统示例,以使之应用不同的传输连接器。

4.2.2 改造股票投资系统示例

    第3章介绍了一个股票投递系统的示例,它使用ActiveMQ去发布和消费股票交换数据。那时,我们使用的固定的标准连接器URI,因为我们想让这个引入示例尽可能地简单。在这章中,我们通过这个示例使用每个协议,介绍并并验证他们。因此,我们需要修改这个示例使之可以用上任何一个协议。
清单4.1是一个示例publisher的main方法的修改版本。

public static void main(String[] args) throws JMSException {
	if (args.length == 0) {
		System.err.println("Please define connection URI!");
		return;
	}

	//define connection URI
	Publisher publisher = new Publisher(args[0]);


	//extract topics from the rest of arguments
	String[] topics = new String[args.length - 1];
	System.arraycopy(args, 1, topics, 0, args.length - 1);
	while (total < 1000) {
		for (int i = 0; i < count; i++) {
			publisher.sendMessage(topics);
		}
		total += count;
		System.out.println("Published '" + count + "' of '" + total + "' price messages");
		try {
		  Thread.sleep(1000);
		} catch (InterruptedException x) {
		}
	  }
	publisher.close();
}

上面的代码使用main方法的第一个参数接收连接器URI,从剩余的参数中提取了topic名称。现在这个示例可以这么运行了:
java org.apache.activemq.book.ch4.Publisher tcp://localhost:61616 CSCO ORCL
输出:
java org.apache.activemq.book.ch4.Publisher tcp://localhost:61616 CSCO ORCL
Sending: {price=18.515499433369747, stock=ORCL, offer=18.534014932803114, up=true} on destination: topic://STOCKS.ORCL
Sending: {price=18.48134317726199, stock=ORCL, offer=18.49982452043925, up=false} on destination: topic://STOCKS.ORCL
Sending: {price=18.504622991003643, stock=ORCL, offer=18.523127613994646, up=true} on destination: topic://STOCKS.ORCL
Sending: {price=57.257781318557264, stock=CSCO, offer=57.31503909987581, up=true} on destination: topic://STOCKS.CSCO
Sending: {price=18.377290722946885, stock=ORCL, offer=18.39566801366983, up=false} on destination: topic://STOCKS.ORCL
Sending: {price=18.481635781087313, stock=ORCL, offer=18.5001174168684, up=true} on destination: topic://STOCKS.ORCL
Sending: {price=57.38708379410251, stock=CSCO, offer=57.444470877896606, up=true} on destination: topic://STOCKS.CSCO
Sending: {price=57.474779874870904, stock=CSCO, offer=57.53225465474577, up=true} on destination: topic://STOCKS.CSCO
Sending: {price=18.468484063224288, stock=ORCL, offer=18.486952547287512, up=false} on destination: topic://STOCKS.ORCL
Sending: {price=18.31812579496238, stock=ORCL, offer=18.33644392075734, up=false} on destination: topic://STOCKS.ORCL
Published '10' of '10' price messages

现在已经给publisher类的main方法加了一个参数,用以连接到正确服务端的url。
相同的原理也可以应用于消费者。下面的清单中,你会看到消费者的main方法也做了类似的修改。
    
public static void main(String[] args) throws JMSException {
	if (args.length == 0) {
		System.err.println("Please define connection URI!");
		return;
	}
	
	//define connection URI
	Consumer consumer = new Consumer(args[0]);
	
	//extract topics from the rest of arguments
	String[] topics = new String[args.length - 1];
	System.arraycopy(args, 1, topics, 0, args.length - 1);
	for (String stock : topics) {
		Destination destination = consumer.getSession().createTopic("STOCKS." + stock);
		MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);
		messageConsumer.setMessageListener(new Listener());
	}
}
java org.apache.activemq.book.ch4.Consumer tcp://localhost:61616 CSCO ORCL
ORCL	70.79	70.86	up
ORCL	70.77	70.84	down
ORCL	70.57	70.64	down
CSCO	98.02	98.11	down
ORCL	70.14	70.21	down
ORCL	69.86	69.93	down
CSCO	98.82	98.92	up
CSCO	98.20	98.30	down
CSCO	98.63	98.73	up
ORCL	69.78	69.85	down    

    现在,生产者和消息者之间的消息流与之前示例中的没有什么别,修改之后,示例就支持运行于多个协议了。现在让我们深入学习下各个特定的传输连接器。下面的部分将会看到如果你想经由网络连接到服务端你会有哪些选择。