异步消息处理的办法——MDB
在我们的产品中,最核心的模块就是关于EPCIS事件的捕获(Capture),原理上很简单,RFID设备捕获到一个event,然后将event放入WMQ中的Queue,我们的server上有一个MDB用于实时监听该Queue,如果发现了新的message,则处理,最终存入DB的EVENTSCHEMA下。
通常测试中可以模拟这样一个事件的产生,下例中TEST.xml就是一个event事件:
submitEvent.sh -queue myeventq -source TEST.xml. |
submitEvent.sh脚本:
SCRIPT_DIR=`dirname $0` if [ -n "$RFIDIC_HOME" -a -d $RFIDIC_HOME"/bin" ];then . $RFIDIC_HOME/bin/rfidic-env.sh else . $SCRIPT_DIR/rfidic-env.sh fi checkLogs appendClassPathWithWASJars #Change SSL settings in rfidic-env.sh file under the sslWASSettings function SSL_SETTINGS=`sslWASSettings` java -classpath $CLASSPATH:$WAS_HOME/lib/j2ee.jar -DRFIDIC_HOME=$RFIDIC_HOME $SSL_SETTINGS com.ibm.rfidic.utils.mq.PutQueue $* |
com.ibm.rfidic.utils.mq.PutQueue中的putQueue方法:
public static void putQueue(String logicalQueueName, String message) throws IOException, JMSException { MessageQueueFactory mqf = MessageQueueFactory.getInstance(); RFIDICMessageQueue mq; try { mq = mqf.getMessageQueueConnection(logicalQueueName); if (mq == null) { l.error(RFIDICMessages.getMessage(4056, logicalQueueName).getIdAndMessage()); } if (source == null) { mq.sendMessage(message); break label101: } Properties jmsHeaders = new Properties(); jmsHeaders.put("RFIDIC_SOURCE", source); mq.sendMessage(message, jmsHeaders); } catch (JMSException e1) { l.error(RFIDICMessages.getMessage(4057, e1).getIdAndMessage()); throw e1; } label101: IMessage msg = RFIDICMessages.getInstance().getMessage(120027); System.out.println(msg.getIdAndMessage()); } |
可以看出利用JMS的API将解析后的event事件放入了message queue。
后台的MDB:
public class CaptureMDBBean implements MessageDrivenBean, MessageListener { private static final String copyright = "(c) Copyright IBM Corporation 2006."; private static final Logger m_logger = Logger.getLogger(CaptureMDBBean.class); private MessageDrivenContext fMessageDrivenCtx; private CaptureMgr capMgr = null; private Capture capture = null; public MessageDrivenContext getMessageDrivenContext() { return this.fMessageDrivenCtx; } public void setMessageDrivenContext(MessageDrivenContext ctx) { this.fMessageDrivenCtx = ctx; } public void ejbCreate() { m_logger.debug("ejbCreate"); } private void initCapture() { m_logger.debug("initCapture"); String filename = RFIDICConfig.getETCDir().getAbsolutePath() + System.getProperty("file.separator") + "Capture.xml"; try { this.capMgr = CaptureMgr.getInstance(); this.capture = this.capMgr.getCapture(filename); } catch (Exception e) { m_logger.error(e); } } public void onMessage(Message msg) { try { DatasourceFactory.getContainerConnection(); if (this.capture == null) { initCapture(); } String xml = null; if (msg instanceof TextMessage) { xml = ((TextMessage)msg).getText(); } else { IMessage errmsg = RFIDICMessages.getMessage(21019, new String[] { msg.getClass().getName(), msg.getJMSMessageID(), new Date(msg.getJMSTimestamp()).toString() }); m_logger.error(errmsg); throw new Exception(errmsg.getIdAndMessage()); } String source = msg.getStringProperty("RFIDIC_SOURCE"); this.capture.captureMessage(xml, source); } catch (Exception e) { m_logger.error(e); this.fMessageDrivenCtx.setRollbackOnly(); } } public void ejbRemove() { m_logger.debug("ejbRemove"); } } |
最核心的方法就是onMessage,它是用来处理发现message后的处理方法。这样的一个MDB我们在利用RAD开发的时候要选择EJB2.X规范,这样在图形化的界面里就可以选择一个对应的WAS上的MDB。
下面介绍WAS上要部署的东西:
1. JMS Provider:
2. Queue 连接工厂:
3. Queue连接工厂JNDI对应的实际MQ地址,Queue manager名称的映射
5. WAS上Queue JNDI名与实际Queue名字的映射:
6. WAS上MDB的配置,将一个queue连接工厂与一个queue定义为MDB要监听的queue。