activemq-cpp-3.9.5
ActiveMQSessionKernel.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
19 #define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
20 
21 #include <cms/Session.h>
22 #include <cms/ExceptionListener.h>
23 
24 #include <activemq/util/Config.h>
25 #include <activemq/util/Usage.h>
42 
43 #include <decaf/lang/Pointer.h>
44 #include <decaf/util/ArrayList.h>
45 #include <decaf/util/Properties.h>
48 
49 #include <string>
50 #include <memory>
51 
52 namespace activemq {
53 namespace core {
54 
55  class ActiveMQConnection;
56  class ActiveMQConsumer;
57  class ActiveMQProducer;
58  class ActiveMQSessionExecutor;
59 
60 namespace kernels {
61 
64 
65  class SessionConfig;
66 
67  class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
68  private:
69 
71 
72  protected:
73 
74  SessionConfig* config;
75 
80 
85 
90 
96 
100  std::auto_ptr<ActiveMQSessionExecutor> executor;
101 
106 
111 
116 
121 
126 
127  private:
128 
130  ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
131 
132  public:
133 
137  const decaf::util::Properties& properties);
138 
139  virtual ~ActiveMQSessionKernel();
140 
145  virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
146 
150  virtual void start();
151 
155  virtual void stop();
156 
161  bool isStarted() const;
162 
163  virtual bool isAutoAcknowledge() const {
164  return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
165  }
166 
167  virtual bool isDupsOkAcknowledge() const {
168  return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
169  }
170 
171  virtual bool isClientAcknowledge() const {
172  return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
173  }
174 
175  virtual bool isIndividualAcknowledge() const {
176  return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
177  }
178 
182  void fire(const exceptions::ActiveMQException& ex);
183 
184  public: // Methods from ActiveMQMessageDispatcher
185 
190  virtual void dispatch(const Pointer<MessageDispatch>& message);
191 
192  public: // Implements Methods
193 
194  virtual void close();
195 
196  virtual void commit();
197 
198  virtual void rollback();
199 
200  virtual void recover();
201 
202  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
203 
204  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
205  const std::string& selector);
206 
207  virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
208  const std::string& selector,
209  bool noLocal);
210 
211  virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
212  const std::string& name,
213  const std::string& selector,
214  bool noLocal = false);
215 
216  virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
217 
218  virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
219 
220  virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
221 
222  virtual cms::Queue* createQueue(const std::string& queueName);
223 
224  virtual cms::Topic* createTopic(const std::string& topicName);
225 
226  virtual cms::TemporaryQueue* createTemporaryQueue();
227 
228  virtual cms::TemporaryTopic* createTemporaryTopic();
229 
230  virtual cms::Message* createMessage();
231 
232  virtual cms::BytesMessage* createBytesMessage();
233 
234  virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
235 
236  virtual cms::StreamMessage* createStreamMessage();
237 
238  virtual cms::TextMessage* createTextMessage();
239 
240  virtual cms::TextMessage* createTextMessage( const std::string& text );
241 
242  virtual cms::MapMessage* createMapMessage();
243 
244  virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
245 
246  virtual bool isTransacted() const;
247 
248  virtual void unsubscribe(const std::string& name);
249 
250  public: // ActiveMQSessionKernel specific Methods
251 
280  cms::Message* message, int deliveryMode, int priority, long long timeToLive,
281  util::MemoryUsage* producerWindow, long long sendTimeout, cms::AsyncCallback* onComplete);
282 
291  cms::ExceptionListener* getExceptionListener();
292 
300  virtual void setMessageTransformer(cms::MessageTransformer* transformer);
301 
307  virtual cms::MessageTransformer* getMessageTransformer() const;
308 
315  this->checkClosed();
316  return *( this->sessionInfo );
317  }
318 
325  this->checkClosed();
326  return *( this->sessionInfo->getSessionId() );
327  }
328 
333  return this->connection;
334  }
335 
339  Pointer<threads::Scheduler> getScheduler() const;
340 
346  long long getLastDeliveredSequenceId() const {
347  return this->lastDeliveredSequenceId;
348  }
349 
356  void setLastDeliveredSequenceId(long long value) {
357  this->lastDeliveredSequenceId = value;
358  }
359 
369  void oneway(Pointer<commands::Command> command);
370 
385  Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
386 
397  void addConsumer(Pointer<ActiveMQConsumerKernel> consumer);
398 
408  void removeConsumer(Pointer<ActiveMQConsumerKernel> consumer);
409 
420  void addProducer(Pointer<ActiveMQProducerKernel> producer);
421 
431  void removeProducer(Pointer<ActiveMQProducerKernel> producer);
432 
440  virtual void doStartTransaction();
441 
448  return this->transaction;
449  }
450 
455  void acknowledge();
456 
461  void deliverAcks();
462 
469 
473  void wakeup();
474 
479  Pointer<commands::ConsumerId> getNextConsumerId();
480 
485  Pointer<commands::ProducerId> getNextProducerId();
486 
493  void doClose();
494 
501  void dispose();
502 
512  void setPrefetchSize(Pointer<commands::ConsumerId> id, int prefetch);
513 
521 
527  bool isInUse(Pointer<commands::ActiveMQDestination> destination);
528 
533 
538 
547  bool iterateConsumers();
548 
554  void checkMessageListener() const;
555 
561  virtual int getHashCode() const;
562 
572  void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
573 
579  bool isSessionAsyncDispatch() const;
580 
586  void setSessionAsyncDispatch(bool sessionAsyncDispatch);
587 
596 
597  private:
598 
603  long long getNextProducerSequenceId() {
604  return this->producerSequenceIds.getNextSequenceId();
605  }
606 
607  // Checks for the closed state and throws if so.
608  void checkClosed() const;
609 
610  // Send the Destination Creation Request to the Broker, alerting it
611  // that we've created a new Temporary Destination.
612  // @param tempDestination - The new Temporary Destination
613  void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
614 
615  // Send the Destination Destruction Request to the Broker, alerting
616  // it that we've removed an existing Temporary Destination.
617  // @param tempDestination - The Temporary Destination to remove
618  void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
619 
620  // Creates a new Temporary Destination name using the connection id
621  // and a rolling count.
622  // @return a unique Temporary Destination name
623  std::string createTemporaryDestinationName();
624 
625  };
626 
627 }}}
628 
629 #endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */
virtual void close()
Terminates the dispatching thread.
Definition: ActiveMQSessionExecutor.h:120
An interface encapsulating a provider-specific topic name.
Definition: Topic.h:36
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:128
Definition: ActiveMQTempDestination.h:34
virtual void stop()
Stops dispatching.
virtual bool isClientAcknowledge() const
Definition: ActiveMQSessionKernel.h:171
Root of all messages.
Definition: Message.h:88
Defines a Temporary Topic based Destination.
Definition: TemporaryTopic.h:39
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
#define AMQCPP_API
Definition: Config.h:30
AcknowledgeMode
Definition: Session.h:108
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
std::auto_ptr< ActiveMQSessionExecutor > executor
Sends incoming messages to the registered consumers.
Definition: ActiveMQSessionKernel.h:100
With this acknowledgment mode, the session automatically acknowledges a client's receipt of a message...
Definition: Session.h:117
Definition: SessionInfo.h:48
This class implements in interface for browsing the messages in a Queue without removing them...
Definition: QueueBrowser.h:53
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
virtual void start()
Starts the dispatching.
cms::Session::AcknowledgeMode ackMode
This Sessions Acknowledgment mode.
Definition: ActiveMQSessionKernel.h:105
virtual bool isDupsOkAcknowledge() const
Definition: ActiveMQSessionKernel.h:167
virtual bool isIndividualAcknowledge() const
Definition: ActiveMQSessionKernel.h:175
A BytesMessage object is used to send a message containing a stream of unsigned bytes.
Definition: BytesMessage.h:66
Pointer< ActiveMQTransactionContext > getTransactionContext()
Gets the Pointer to this Session's TransactionContext.
Definition: ActiveMQSessionKernel.h:447
virtual void wakeup()
wakeup this executer and dispatch any pending messages.
util::LongSequenceGenerator consumerIds
Next available Consumer Id.
Definition: ActiveMQSessionKernel.h:120
util::LongSequenceGenerator producerSequenceIds
Next available Producer Sequence Id.
Definition: ActiveMQSessionKernel.h:115
util::LongSequenceGenerator producerIds
Next available Producer Id.
Definition: ActiveMQSessionKernel.h:110
Defines a Temporary Queue based Destination.
Definition: TemporaryQueue.h:39
ActiveMQConnection * getConnection() const
Gets the ActiveMQConnection that is associated with this session.
Definition: ActiveMQSessionKernel.h:332
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
Delegate dispatcher for a single session.
Definition: ActiveMQSessionExecutor.h:44
Definition: ActiveMQProducerKernel.h:44
Message will be acknowledged individually.
Definition: Session.h:146
With this acknowledgment mode, the client acknowledges a consumed message by calling the message's ac...
Definition: Session.h:134
A MapMessage object is used to send a set of name-value pairs.
Definition: MapMessage.h:71
Definition: MemoryUsage.h:28
Interface for a StreamMessage.
Definition: StreamMessage.h:61
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
virtual void clearMessagesInProgress()
Removes all messages in the Dispatch Channel so that non are delivered.
Definition: ActiveMQSessionExecutor.h:90
Definition: ActiveMQException.h:35
Pointer< commands::SessionInfo > sessionInfo
SessionInfo for this Session.
Definition: ActiveMQSessionKernel.h:79
void setLastDeliveredSequenceId(long long value)
Sets the value of the Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:356
const commands::SessionInfo & getSessionInfo() const
Gets the Session Information object for this session, if the session is closed than this method throw...
Definition: ActiveMQSessionKernel.h:314
AtomicBoolean closed
Indicates that this connection has been closed, it is no longer usable after this becomes true...
Definition: ActiveMQSessionKernel.h:95
A client uses a MessageProducer object to send messages to a Destination.
Definition: MessageProducer.h:60
This class is used to generate a sequence of long long values that are incremented each time a new va...
Definition: LongSequenceGenerator.h:32
A client uses a MessageConsumer to received messages from a destination.
Definition: MessageConsumer.h:63
Definition: ArrayList.h:39
long long lastDeliveredSequenceId
Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:125
Definition: SessionId.h:51
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
Definition: ActiveMQSessionKernel.h:67
long long getLastDeliveredSequenceId() const
Gets the currently set Last Delivered Sequence Id.
Definition: ActiveMQSessionKernel.h:346
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Definition: MessageDispatchChannel.h:34
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:61
SessionConfig * config
Definition: ActiveMQSessionKernel.h:74
Interface for a text message.
Definition: TextMessage.h:41
ActiveMQConnection * connection
Connection.
Definition: ActiveMQSessionKernel.h:89
const commands::SessionId & getSessionId() const
Gets the Session Id object for this session, if the session is closed than this method throws an exce...
Definition: ActiveMQSessionKernel.h:324
An interface encapsulating a provider-specific queue name.
Definition: Queue.h:37
Pointer< ActiveMQTransactionContext > transaction
Transaction Management object.
Definition: ActiveMQSessionKernel.h:84
virtual bool isAutoAcknowledge() const
Definition: ActiveMQSessionKernel.h:163