ActiveMQ in Action-5.4 The JDBC message store- 高飞网

5.4 The JDBC message store

2016-05-26 11:54:50.0

5.4 JDBC消息存储

    灵活的ActiveMQ插件式消息存储API允许你选择不同的实现。最原始和和通用的存储实现是以JDBC来进行消息存储。

    这么多组织选择使用JDBC消息存储的原因是,它们对这种关系型数据库有很多使用经验。JDBC持久化很明显没有上面提到的消息存储在性能方面的优势。事实上真相是很多企业已经在关系型数据库上花费了大量金钱,因此他们想尽其所能充分使用它们。

    但是使用共享数据库是对于由多个服务端组成的master/slave(主从)拓扑结构非常之有用。当一个ActiveMQ服务端组配置使用一个共享数据库,它们都会尝试连接并获取一个表级锁的锁,但只有一个能成功并成功master。其余的服务端将变成slaves,并将进入等待状态,不再接收客户端连接直到master失败。这是个ActiveMQ常用的场景,将在第10章详细讲解。

    当使用JDBC消息存储,ActiveMQ使用的默认的JDBC驱动是Apache Derby。但其他的关系型数据也受支持。

5.4.1 JDBC消息存储支持的数据库

    几乎每种数据库都可以通过JDBC驱动使用。虽然下面不是一个完整的列表,JDBC存储有下面的可支持的数据库:
    Apache Derby
    MySQL
    Postgre SQL
    Oracle
    SQL Server
    Sybase
    Informix
    MaxDB

有些用户更喜欢使用关系型数据库,因为它能够从数据库中直接查询到消息。下面的部分将探讨这个话题。

使用Apache Derby ActiveMQ

    如前所述,Apache Derby是使用JDBC数据库时默认的数据库。不仅仅因为它是纯java写的,它还是嵌入式的。Derby提供了完整的特性集合,性能优秀,并提供了一个小脚本。但是使用ActiveMQ用户应该注意一个警示。那就是,Derby在JVM的GC中可能运行更为艰难。因为数据库中有太多的消息存储和删除动作,经验表明,Derby运行与他单独的JVM实例中,将能是ActiveMQ的性能更佳。原因是ActiveMQ和Debey不会再竞争相同的JVM资源了。

5.4.2 JDBC消息存储模式

    JDBC消息存储使用了包含三个table的模式。两个表用来存储消息,第三个用来作为一个锁表,确保同一时刻只有一个ActiveMQ服务端可以访问数据库。下面是表的详细描述。

    表格5.3中展示了消息表,默认情况下表名为ACTIVEMQ_MSGS,表定义如下

    表5.3 ACTIVEMQ_MSGS表中的列

列名
列名
列名
ID
INTEGER
用于查询消息的序列化ID
CONTAINER
VARCHAR(250)
消息的目的地
MSGID_PROD
VARCHAR(250)
消息生产者目的地
MSGID_SEQ
INTEGER
生产者的消息序列号. 这个字段与MSGID_PROD一起等价于JMSMessageID.
EXPIRATION
BIGINT
消息过期的毫秒时间
MSG
BLOB
消息自身的序列化

    queue和topic消息都会被分解并保存到ACTIVEMQ_MSGS表中。另有一个单独的表用来保存持久订阅者消息,一个ID表示持久订阅者接收到的最新的消息。这些信息由表5.4中的ACTIVEMQ_ACKS表来存储.

表5.4 ACTIVEMQ_ACKS表中的列
列名
列名
描述
CONTAINER
VARCHAR(250)
消息的目的地
SUB_DEST
VARCHAR(250)
持久订阅者的目的地(can be different from the container if using wildcards)
CLIENT_IDVARCHAR(250)
持久订阅者的client ID
SUB_NAME
VARCHAR(250)
持久订阅者的订阅名称
SELECTOR
VARCHAR(250)
持久订阅者的选择器
LAST_ACKED_ID
Integer
订阅者的接收到最新消息的序列ID
    对于持久订阅者,LAST_ACKED_ID序列是用于简单外键指向ACTIVEMQ_MSGS,并使持久订阅者的消息易于从ACTIVEMQ_MSGS查询到。
    锁定表,名为ACTIVEMQ_LOCK,用于确保同一时间只有一个ActiveMQ服务端实例可以访问数据库。如果ActiveMQ服务端不能获取到数据库锁,服务端将不能完全初始化,并一直等待锁被释放,或者自己关闭。表结构如下表5.5所示。

    表5.5 ACTIVEMQ_LOCK表的列

列名
默认类型
默认类型
ID
INTEGER
锁的唯一ID
Broker Name
VARCHAR(250)
持久锁的ActiveMQ服务端名称

    现在我们已经解析了JDBC消息存储使用的表结构。那么再看一下配置JDBC消息存储的一些示例。


5.4.3  配置JDBC消息存储

    配置默认的JDBC消息存储非常简单,如前所述,默认的JDBC消息存储使用Apache Derby,如下:

<beans>
    <broker brokerName="test-broker" persistent="true"
        xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter
                dataDirectory="activemq-data" />
        </persistenceAdapter>
    </broker>
</beans>

    上面的为ActiveMQ服务端的JDBC消息存储(默认使用Apache Derby)的配置设置了持久化适配器,并设置了嵌入式Apache Derby的数据使用路径。

    启动:bin\activemq.bat start xbean:d:/software/apache-activemq-5.13.2/conf/activemq_jdbc.xml

    默认情况下,上面的配置会因为获取数据库的类库而报如下的错:

ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in file [d:\software\apache-activemq-5.13.2\conf\activemq_jdbc.xml]: Invocation of init method failed; nested exception is java.lang.NoClassDefFoundError: org/apache/derby/jdbc/EmbeddedDataSource

     因此需要手工将类库放到activemq的lib目录下,可到Apache Derby: Downloads下面下载jar包:derby.jar,运行以后,会发现在配置的目录下生成下面的数据文件:

              

    

    JDBC持久化适配器(JDBC消息存储接口)的关键属性是数据库属性。该属性定义了一个工厂,而该工厂中会创建到关系型数据库的连接。配置数据源对象使JDBC消息存储适配器使用物理的数据而不是默认的。下面是一个ActiveMQ使用中使用Mysql数据源的JDBC配置示例:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" persistent="true"
        xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds" />
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url"
            value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" />
        <property name="username" value="activemq" />
        <property name="password" value="activemq" />
        <property name="maxActive" value="200" />
        <property name="poolPreparedStatements" value="true" />
    </bean>
</beans>

    上面的配置使用了Apache Commons DBCP数据源封装MySQL数据驱动用于连接池。在该示例中,驱动类名(driverClassName)是JDBC使用的驱动的类名称。你还可以配置一些直接作用到数据驱动本身的属性。例如,maxActiveMySQL数据源连接的属性,告诉数据源一次最多开启多少数据源。

    仅为了比较之目的,这里展示了使用Oracle数据库的配置示例:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" persistent=true
        xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#oracle-ds" />
        </persistenceAdapter>
    </broker>
    <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
        <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
        <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB" />
        <property name="username" value="scott" />
        <property name="password" value="tiger" />
        <property name="maxActive" value="200" />
        <property name="poolPreparedStatements" value="true" />
    </bean>
</beans>

    该示例使用了Apache Commons  DBCP数据源封装了Oracle的JDBC驱动用于连接池。

    现在几个JDBC消息存储的示例已经都展示过了,你可能会问,什么时候使用哪种这种持久化最好呢?

    注:译者在使用mysql作为数据源存储数据时,起先遇到了下面的问题:

java.lang.ClassNotFoundException: org.apache.commons.dbcp.BasicDataSource
java.lang.ClassNotFoundException: org.apache.commons.pool.impl.GenericObjectPool

    即还是缺少类库,于是去maven中下载了dbcp连接池的两个jar和一个mysql数据库驱动jar
    commons-dbcp-1.2.2.jar、commons-pool-1.6.jar、mysql-connector-java-5.1.18-bin.jar

    数据库方面我只建了一个数据库,而没有建表,发现运行activemq服务以后,会自动建表,这个比较爽!
    

下面是保存消息的数据表:


5.4.4 使用JDBC消息存储与ActiveMQ日志

    即使JDBC消息存储的性能不可观,但可通过使用ActiveMQ日志改善。日志确保了JSM事务的一致性。因为它结合了使用缓存技术的快速写入,这显著地提供了ActiveMQ服务端的性能。

    这里有一个使用JDBC(又称日志式JDBC)日志的示例配置。这个案例中,Apache Derby开始使用了。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory
                journalLogFiles="4" 
				journalLogFileSize="32768"
                useJournal="true" 
				useQuickJournal="true" 
				dataSource="#derby-ds"
                dataDirectory="activemq-data" />
        </persistenceFactory>
    </broker>
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb" />
        <property name="createDatabase" value="create" />
    </bean>
</beans>


    这种日志可以与任何数据源一起使用,但更重要的是知道什么时候不应使用它。

    首先日志比起标准的JDBC消息存储,具有相当可观的性能优势。尤其是当JDBC数据库与服务端在相同的机器上协同工作时。然而只有一种情况不能使用日志,就是在主从(master/slave)的共享数据库配置中。因为日志中来自主库的消息,在被提交到数据库之前,可能在本地存储。在配置中使用日志时,因为日志不会恢复,如果主库失败就有可能导致消息丢失。

    我们通过示例配置覆盖到了关系数据库的消息存储。下一部分我看一下内存存储,这并不是持久化消息。