异步消息处理的办法——MDB

在我们的产品中,最核心的模块就是关于EPCIS事件的捕获(Capture),原理上很简单,RFID设备捕获到一个event,然后将event放入WMQ中的Queue,我们的server上有一个MDB用于实时监听该Queue,如果发现了新的message,则处理,最终存入DB的EVENTSCHEMA下。

通常测试中可以模拟这样一个事件的产生,下例中TEST.xml就是一个event事件:

submitEvent.sh -queue myeventq -source TEST.xml.

submitEvent.sh脚本:

SCRIPT_DIR=`dirname $0`
if [ -n "$RFIDIC_HOME" -a -d $RFIDIC_HOME"/bin" ];then
. $RFIDIC_HOME/bin/rfidic-env.sh
else
. $SCRIPT_DIR/rfidic-env.sh
fi
 
checkLogs
appendClassPathWithWASJars
 
#Change SSL settings in rfidic-env.sh file under the sslWASSettings function
SSL_SETTINGS=`sslWASSettings`
 
java -classpath $CLASSPATH:$WAS_HOME/lib/j2ee.jar -DRFIDIC_HOME=$RFIDIC_HOME $SSL_SETTINGS com.ibm.rfidic.utils.mq.PutQueue $*

com.ibm.rfidic.utils.mq.PutQueue中的putQueue方法:

public static void putQueue(String logicalQueueName, String message) throws IOException, JMSException {
MessageQueueFactory mqf = MessageQueueFactory.getInstance();
RFIDICMessageQueue mq;
try
{
mq = mqf.getMessageQueueConnection(logicalQueueName);
if (mq == null) {
l.error(RFIDICMessages.getMessage(4056, logicalQueueName).getIdAndMessage());
}
if (source == null) {
mq.sendMessage(message); break label101:
}
Properties jmsHeaders = new Properties();
jmsHeaders.put("RFIDIC_SOURCE", source);
mq.sendMessage(message, jmsHeaders);
}
catch (JMSException e1) {
l.error(RFIDICMessages.getMessage(4057, e1).getIdAndMessage());
throw e1;
}
 
label101: IMessage msg = RFIDICMessages.getInstance().getMessage(120027);
System.out.println(msg.getIdAndMessage());
}

可以看出利用JMS的API将解析后的event事件放入了message queue。

后台的MDB:

public class CaptureMDBBean
implements MessageDrivenBean, MessageListener
{
private static final String copyright = "(c) Copyright IBM Corporation 2006.";
private static final Logger m_logger = Logger.getLogger(CaptureMDBBean.class);
private MessageDrivenContext fMessageDrivenCtx;
private CaptureMgr capMgr = null;
private Capture capture = null;
 
public MessageDrivenContext getMessageDrivenContext()
{
return this.fMessageDrivenCtx;
}
 
public void setMessageDrivenContext(MessageDrivenContext ctx)
{
this.fMessageDrivenCtx = ctx;
}
 
public void ejbCreate()
{
m_logger.debug("ejbCreate");
}
 
private void initCapture() {
m_logger.debug("initCapture");
 
String filename = RFIDICConfig.getETCDir().getAbsolutePath() + System.getProperty("file.separator") + "Capture.xml";
try
{
this.capMgr = CaptureMgr.getInstance();
this.capture = this.capMgr.getCapture(filename);
}
catch (Exception e) {
m_logger.error(e);
}
}
 
public void onMessage(Message msg)
{
try
{
DatasourceFactory.getContainerConnection();
 
if (this.capture == null) {
initCapture();
}
 
String xml = null;
if (msg instanceof TextMessage) {
xml = ((TextMessage)msg).getText();
} else {
IMessage errmsg = RFIDICMessages.getMessage(21019, new String[] { msg.getClass().getName(),
msg.getJMSMessageID(), new Date(msg.getJMSTimestamp()).toString() });
m_logger.error(errmsg);
throw new Exception(errmsg.getIdAndMessage());
}
 
String source = msg.getStringProperty("RFIDIC_SOURCE");
 
this.capture.captureMessage(xml, source);
} catch (Exception e) {
m_logger.error(e);
this.fMessageDrivenCtx.setRollbackOnly();
}
}
 
public void ejbRemove()
{
m_logger.debug("ejbRemove");
}
}

最核心的方法就是onMessage,它是用来处理发现message后的处理方法。这样的一个MDB我们在利用RAD开发的时候要选择EJB2.X规范,这样在图形化的界面里就可以选择一个对应的WAS上的MDB。

下面介绍WAS上要部署的东西:

1. JMS Provider:

2. Queue 连接工厂:

3. Queue连接工厂JNDI对应的实际MQ地址,Queue manager名称的映射

4. Queue

5. WAS上Queue JNDI名与实际Queue名字的映射:

6. WAS上MDB的配置,将一个queue连接工厂与一个queue定义为MDB要监听的queue。

Leave a Comment.