Class WmMessageConsumerImpl
- java.lang.Object
-
- com.webmethods.locks.Node
-
- com.webmethods.jms.impl.WmMessageConsumerImpl
-
- All Implemented Interfaces:
ReplyCb
,WmMessageConsumer
,javax.jms.MessageConsumer
- Direct Known Subclasses:
WmClusterMessageConsumerImpl
,WmQueueReceiverImpl
,WmTopicSubscriberImpl
public class WmMessageConsumerImpl extends Node implements WmMessageConsumer, ReplyCb
-
-
Field Summary
Fields Modifier and Type Field Description protected boolean
_closed
protected WmDestinationImpl
_destination
protected java.lang.String
_durableName
protected java.lang.String
_filter
protected boolean
_logApi
protected int
_maxReceive
protected javax.jms.MessageListener
_messageListener
protected WmConsumerQueue
_messageQueue
protected boolean
_noLocal
protected ProtocolHandler
_protocolHandler
protected java.lang.String
_selector
protected WmSessionImpl
_session
protected int
_sessionID
protected java.util.HashMap<WmXid,java.lang.Long>
_suspendedTransactions
-
Fields inherited from class com.webmethods.locks.Node
_lockManager
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
WmMessageConsumerImpl(WmSessionImpl session, boolean registerAutoAck)
Default contructor called by subclasses.protected
WmMessageConsumerImpl(WmSessionImpl session, WmDestinationImpl destination, java.lang.String durableName, java.lang.String selector, boolean noLocal)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
acknowledge()
Acknowledge all messages in the acknowledge list.void
addToPreAckList(WmMessageImpl message)
Add a message to the acknowledgement list.void
autoAcknowledge(WmMessageImpl message)
Acknowledge the given sequence number.long
available()
Gets the number of messages on the Broker available to the consumer.long
availableOnServer()
This is added for Cluster supportvoid
cancelRequestEvents()
Cancel the pending request for events.protected void
checkState()
protected void
checkSynchronous()
void
clear()
Clears all available messages to the consumer from the Broker.void
clientAcknowledge(WmMessageImpl message)
Add a message to the acknowledgement list.void
close()
Closes the message consumer.void
close(boolean force, boolean remove)
void
commit(long transactionID)
Commit a specific transaction.void
commit(WmMessageImpl message)
Commits a transaction under which the message is received.protected java.lang.String
createFilter(WmDestinationImpl destination, java.lang.String selector, boolean noLocal)
Create a Broker filter from a JMS selector.protected void
deliver(WmMessageImpl message)
Delivers a message to the message consumer.void
dupsOkAcknowledge(WmMessageImpl message)
Set the acknowledge through sequence number.java.util.LinkedList<WmMessageImpl>
getClientAcknowledgeList()
javax.jms.Destination
getDestination()
Gets theDestination
associated with this consumer.java.lang.String
getDurableName()
java.io.InputStream
getInputStream()
Returns an InputStream that reads data from a MessageConsumer.int
getMaxReceive()
Get the maximum receive value for this message consumer.javax.jms.MessageListener
getMessageListener()
Gets the message consumer'sMessageListener
.WmConsumerQueue
getMessageQueue()
java.lang.String
getMessageSelector()
Gets this message consumer's message selector expression.javax.jms.Session
getSession()
boolean
getTransactionAckCount()
boolean
isNoLocal()
void
onMessage()
Process any synchronously received messages.void
onMessage(WmMessageImpl message)
Called by the session dispatcher to process messages received asynchronously.void
onReply(Notification notification)
Receives asynchronous notification that an event has arrived from the Broker.void
preAckClientList()
Acknowledge all messages in the acknowledge list.void
preAcknowledge(WmMessageImpl message)
Pre-Acknowledge the given UUID number.javax.jms.Message
receive()
Receives the next message produced for this message consumer.javax.jms.Message
receive(long timeout)
Receives the next message that arrives within the specified timeout interval.javax.jms.Message
receiveNoWait()
Receives the next message if one is immediately available.javax.jms.Message
removeMessage()
javax.jms.Message
removeMessageNoWait()
javax.jms.Message
removeMessageWithWait(long timeout)
protected void
requestEvents()
void
requestEvents(int max, boolean synchronous)
Issue a request for events to the Broker.protected static void
requestEvents(WmMessageConsumerImpl consumer, boolean synchronous)
void
rollback(long transactionID)
Rolls back a transaction specified with the transactionID.void
setFilter(java.lang.String filter)
void
setMaxReceive(int maxReceive)
Set the maximum receive value for this message consumer.void
setMessageListener(javax.jms.MessageListener listener)
Sets the message consumer'sMessageListener
.void
setMessageListener(javax.jms.MessageListener listener, boolean setonSession, WmSessionQueue queue)
void
setMessageQueue(WmConsumerQueue queue)
Added to change the implementation of the WmConsumerQueue to the WmConsumerQueueImpl in case of cluster consumersvoid
setTransactionID(long transactionID)
Set the current transaction ID.void
setTransactionID(WmXid xid, long transactionID)
Associate a transaction ID with an external ID.void
setTransactionTimeout(long transactionID, int timeout)
void
start()
Starts synchronous message delivery.void
startLocalTransaction()
Starts a local transaction.void
stop()
Stops synchronous message delivery.protected void
stopAutoAcknowledger()
java.lang.String
toString()
void
transactedAcknowledge(WmMessageImpl message)
Acknowledge the given sequence number.void
transactionComplete()
-
Methods inherited from class com.webmethods.locks.Node
getLockManager, getParent
-
-
-
-
Field Detail
-
_protocolHandler
protected ProtocolHandler _protocolHandler
-
_session
protected WmSessionImpl _session
-
_destination
protected WmDestinationImpl _destination
-
_messageQueue
protected WmConsumerQueue _messageQueue
-
_selector
protected java.lang.String _selector
-
_filter
protected java.lang.String _filter
-
_messageListener
protected javax.jms.MessageListener _messageListener
-
_durableName
protected java.lang.String _durableName
-
_noLocal
protected boolean _noLocal
-
_logApi
protected boolean _logApi
-
_suspendedTransactions
protected java.util.HashMap<WmXid,java.lang.Long> _suspendedTransactions
-
_maxReceive
protected int _maxReceive
-
_closed
protected boolean _closed
-
_sessionID
protected int _sessionID
-
-
Constructor Detail
-
WmMessageConsumerImpl
protected WmMessageConsumerImpl(WmSessionImpl session, WmDestinationImpl destination, java.lang.String durableName, java.lang.String selector, boolean noLocal) throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
WmMessageConsumerImpl
protected WmMessageConsumerImpl(WmSessionImpl session, boolean registerAutoAck)
Default contructor called by subclasses.
-
-
Method Detail
-
setMessageQueue
public void setMessageQueue(WmConsumerQueue queue)
Added to change the implementation of the WmConsumerQueue to the WmConsumerQueueImpl in case of cluster consumers- Parameters:
queue
-
-
getMessageSelector
public java.lang.String getMessageSelector() throws javax.jms.JMSException
Gets this message consumer's message selector expression.- Specified by:
getMessageSelector
in interfacejavax.jms.MessageConsumer
- Returns:
- this message consumer's message selector, or null if no message selector exists for the message consumer (that is, if the message selector was not set or was set to null or the empty string)
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the message selector due to some internal error.
-
getMessageListener
public javax.jms.MessageListener getMessageListener() throws javax.jms.JMSException
Gets the message consumer'sMessageListener
.- Specified by:
getMessageListener
in interfacejavax.jms.MessageConsumer
- Returns:
- the listener for the message consumer, or null if no listener is set
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the message listener due to some internal error.- See Also:
MessageConsumer.setMessageListener(javax.jms.MessageListener)
-
setMessageListener
public void setMessageListener(javax.jms.MessageListener listener) throws javax.jms.JMSException
Sets the message consumer'sMessageListener
.Setting the message listener to null is the equivalent of unsetting the message listener for the message consumer.
The effect of calling
MessageConsumer.setMessageListener
while messages are being consumed by an existing listener or the consumer is being used to consume messages synchronously is undefined.- Specified by:
setMessageListener
in interfacejavax.jms.MessageConsumer
- Parameters:
listener
- the listener to which the messages are to be delivered- Throws:
javax.jms.JMSException
- if the JMS provider fails to set the message listener due to some internal error.
-
setMessageListener
public void setMessageListener(javax.jms.MessageListener listener, boolean setonSession, WmSessionQueue queue)
-
receive
public javax.jms.Message receive() throws javax.jms.JMSException
Receives the next message produced for this message consumer.This call blocks indefinitely until a message is produced or until this message consumer is closed.
If this
receive
is done within a transaction, the consumer retains the message until the transaction commits.- Specified by:
receive
in interfacejavax.jms.MessageConsumer
- Returns:
- the next message produced for this message consumer, or null if this message consumer is concurrently closed
- Throws:
javax.jms.JMSException
- if the JMS provider fails to receive the next message due to some internal error.
-
receive
public javax.jms.Message receive(long timeout) throws javax.jms.JMSException
Receives the next message that arrives within the specified timeout interval.This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A
timeout
of zero never expires, and the call blocks indefinitely.- Specified by:
receive
in interfacejavax.jms.MessageConsumer
- Parameters:
timeout
- the timeout value (in milliseconds)- Returns:
- the next message produced for this message consumer, or null if the timeout expires or this message consumer is concurrently closed
- Throws:
javax.jms.JMSException
- if the JMS provider fails to receive the next message due to some internal error.
-
receiveNoWait
public javax.jms.Message receiveNoWait() throws javax.jms.JMSException
Receives the next message if one is immediately available.- Specified by:
receiveNoWait
in interfacejavax.jms.MessageConsumer
- Returns:
- the next message produced for this message consumer, or null if one is not available
- Throws:
javax.jms.JMSException
- if the JMS provider fails to receive the next message due to some internal error.
-
close
public void close() throws javax.jms.JMSException
Closes the message consumer.Since a provider may allocate some resources on behalf of a
MessageConsumer
outside the Java virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually reclaim these resources may not be timely enough.This call blocks until a
receive
or message listener in progress has completed. A blocked message consumerreceive
call returns null when this message consumer is closed.- Specified by:
close
in interfacejavax.jms.MessageConsumer
- Specified by:
close
in interfaceWmMessageConsumer
- Throws:
javax.jms.JMSException
- if the JMS provider fails to close the consumer due to some internal error.
-
setMaxReceive
public void setMaxReceive(int maxReceive) throws javax.jms.JMSException
Set the maximum receive value for this message consumer. The message consumer will accumulate at most this number of messages at a time.- Specified by:
setMaxReceive
in interfaceWmMessageConsumer
- Parameters:
maxReceive
- the maximum number of messages to be accumulated by this message consumer- Throws:
javax.jms.JMSException
- if the JMS provider fails to set the maximum receive value.- See Also:
getMaxReceive()
-
getMaxReceive
public int getMaxReceive() throws javax.jms.JMSException
Get the maximum receive value for this message consumer. The message consumer will accumulate at most this number of messages at a time.- Specified by:
getMaxReceive
in interfaceWmMessageConsumer
- Returns:
- the maximum number of messages to be accumulated by this message consumer
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the maximum receive value.- See Also:
setMaxReceive(int)
-
getDestination
public javax.jms.Destination getDestination() throws javax.jms.JMSException
Gets theDestination
associated with this consumer.- Specified by:
getDestination
in interfaceWmMessageConsumer
- Returns:
- this consumer's
Destination
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the destination for this consumer due to some internal error.
-
available
public long available() throws javax.jms.JMSException
Gets the number of messages on the Broker available to the consumer. This is an approximate number in that some of the messages may be expired or don't match the consumer's message selector or are already pre-acknowledged..- Specified by:
available
in interfaceWmMessageConsumer
- Returns:
- the number of messages available to the consumer
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the number of available messages.
-
availableOnServer
public long availableOnServer() throws javax.jms.JMSException
This is added for Cluster support- Returns:
- the number of messages available to the consumer
- Throws:
javax.jms.JMSException
- if the JMS provider fails to get the number of available messages.
-
clear
public void clear() throws javax.jms.JMSException
Clears all available messages to the consumer from the Broker.- Specified by:
clear
in interfaceWmMessageConsumer
- Throws:
javax.jms.JMSException
- if the JMS provider fails to clear the consumer's messages.
-
getInputStream
public java.io.InputStream getInputStream() throws javax.jms.JMSException
Returns an InputStream that reads data from a MessageConsumer.- Specified by:
getInputStream
in interfaceWmMessageConsumer
- Returns:
- an InputStream
- Throws:
javax.jms.JMSException
- if the MessageConsumer's InputStream could not be returned.
-
setFilter
public void setFilter(java.lang.String filter) throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
checkState
protected void checkState() throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
checkSynchronous
protected void checkSynchronous() throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
getSession
public javax.jms.Session getSession()
-
close
public void close(boolean force, boolean remove) throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
stopAutoAcknowledger
protected void stopAutoAcknowledger() throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
deliver
protected void deliver(WmMessageImpl message) throws javax.jms.JMSException
Delivers a message to the message consumer.- Parameters:
message
- the message to deliver- Throws:
javax.jms.JMSException
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
start
public void start() throws javax.jms.JMSException
Starts synchronous message delivery. Called byWmSessionImpl.start()
.- Throws:
javax.jms.JMSException
-
stop
public void stop() throws javax.jms.JMSException
Stops synchronous message delivery. Called byWmSessionImpl.stop()
.- Throws:
javax.jms.JMSException
-
onMessage
public void onMessage(WmMessageImpl message) throws javax.jms.JMSException
Called by the session dispatcher to process messages received asynchronously.- Parameters:
message
- the message- Throws:
javax.jms.JMSException
-
onMessage
public void onMessage() throws javax.jms.JMSException
Process any synchronously received messages. This can happen if a message listener is set after the receive method was called.- Throws:
javax.jms.JMSException
-
dupsOkAcknowledge
public void dupsOkAcknowledge(WmMessageImpl message) throws javax.jms.JMSException
Set the acknowledge through sequence number. Used by the session when the acknowledge mode is DUPS_OK_ACKNOWLEDGE.- Parameters:
message
- the message to acknowledge through- Throws:
javax.jms.JMSException
-
clientAcknowledge
public void clientAcknowledge(WmMessageImpl message)
Add a message to the acknowledgement list. Used by the session when the acknowledge mode is CLIENT_ACKNOWLEDGE or the session is transacted.- Parameters:
message
- the message to add
-
acknowledge
public void acknowledge() throws javax.jms.JMSException
Acknowledge all messages in the acknowledge list.- Throws:
javax.jms.JMSException
-
autoAcknowledge
public void autoAcknowledge(WmMessageImpl message) throws javax.jms.JMSException
Acknowledge the given sequence number.- Parameters:
message
- the message to acknowledge.- Throws:
javax.jms.JMSException
-
transactedAcknowledge
public void transactedAcknowledge(WmMessageImpl message) throws javax.jms.JMSException
Acknowledge the given sequence number.- Parameters:
message
- the message to acknowledge.- Throws:
javax.jms.JMSException
-
preAcknowledge
public void preAcknowledge(WmMessageImpl message) throws javax.jms.JMSException
Pre-Acknowledge the given UUID number.- Parameters:
message
- the message to acknowledge.- Throws:
javax.jms.JMSException
-
commit
public void commit(long transactionID) throws javax.jms.JMSException
Commit a specific transaction.- Parameters:
transactionID
- the transaction to commit- Throws:
javax.jms.JMSException
-
commit
public void commit(WmMessageImpl message) throws javax.jms.JMSException
Commits a transaction under which the message is received.- Parameters:
message
-- Throws:
javax.jms.JMSException
-
setTransactionTimeout
public void setTransactionTimeout(long transactionID, int timeout) throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
startLocalTransaction
public void startLocalTransaction() throws javax.jms.JMSException
Starts a local transaction.- Throws:
javax.jms.JMSException
-
transactionComplete
public void transactionComplete() throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
setTransactionID
public void setTransactionID(WmXid xid, long transactionID) throws javax.jms.JMSException
Associate a transaction ID with an external ID.- Parameters:
xid
-transactionID
-- Throws:
javax.jms.JMSException
-
setTransactionID
public void setTransactionID(long transactionID) throws javax.jms.JMSException
Set the current transaction ID.- Throws:
javax.jms.JMSException
-
rollback
public void rollback(long transactionID) throws javax.jms.JMSException
Rolls back a transaction specified with the transactionID. This typically is used to rollback transactions which were conducted during ASF mode.- Throws:
javax.jms.JMSException
- if the JMS provider fails to roll back the transaction due to some internal error.javax.jms.IllegalStateException
- if the method is not called by a transacted session.
-
createFilter
protected java.lang.String createFilter(WmDestinationImpl destination, java.lang.String selector, boolean noLocal) throws javax.jms.JMSException
Create a Broker filter from a JMS selector.- Throws:
javax.jms.JMSException
-
requestEvents
protected static void requestEvents(WmMessageConsumerImpl consumer, boolean synchronous) throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
requestEvents
protected void requestEvents() throws javax.jms.JMSException
- Throws:
javax.jms.JMSException
-
requestEvents
public void requestEvents(int max, boolean synchronous) throws javax.jms.JMSException
Issue a request for events to the Broker.- Parameters:
max
- maximum number of messages to receive from the Broker acknowledge mode, or -1 to acknowledge nothing- Throws:
javax.jms.JMSException
- if the request could not be sent
-
cancelRequestEvents
public void cancelRequestEvents() throws javax.jms.JMSException
Cancel the pending request for events.- Throws:
javax.jms.JMSException
- if the pending request cannot be canceled
-
onReply
public void onReply(Notification notification)
Receives asynchronous notification that an event has arrived from the Broker.
-
getMessageQueue
public WmConsumerQueue getMessageQueue()
-
addToPreAckList
public void addToPreAckList(WmMessageImpl message)
Add a message to the acknowledgement list. Used by the session when the acknowledge mode is CLIENT_ACKNOWLEDGE or the session is transacted.- Parameters:
message
- the message to add
-
preAckClientList
public void preAckClientList() throws javax.jms.JMSException
Acknowledge all messages in the acknowledge list.- Throws:
javax.jms.JMSException
-
removeMessage
public javax.jms.Message removeMessage()
-
removeMessageWithWait
public javax.jms.Message removeMessageWithWait(long timeout)
-
removeMessageNoWait
public javax.jms.Message removeMessageNoWait()
-
getTransactionAckCount
public boolean getTransactionAckCount()
-
getClientAcknowledgeList
public java.util.LinkedList<WmMessageImpl> getClientAcknowledgeList()
-
getDurableName
public java.lang.String getDurableName()
-
isNoLocal
public boolean isNoLocal()
-
-