activemq-cpp-3.9.5
ActiveMQConnection.h
Go to the documentation of this file.
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements. See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License. You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
19 #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
20 
21 #include <cms/EnhancedConnection.h>
22 #include <activemq/util/Config.h>
34 #include <decaf/util/Properties.h>
35 #include <decaf/util/ArrayList.h>
41 
42 #include <string>
43 #include <memory>
44 
45 namespace activemq {
46 namespace core {
47 
49 
50  class ActiveMQSession;
51  class ConnectionConfig;
52  class PrefetchPolicy;
53  class RedeliveryPolicy;
54 
63  private:
64 
65  ConnectionConfig* config;
66 
70  std::auto_ptr<cms::ConnectionMetaData> connectionMetaData;
71 
76 
82 
88 
93 
94  private:
95 
97  ActiveMQConnection& operator=(const ActiveMQConnection&);
98 
99  public:
100 
110  const Pointer<decaf::util::Properties> properties);
111 
112  virtual ~ActiveMQConnection();
113 
122  virtual void addSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
123 
132  virtual void removeSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> session);
133 
142  virtual void addProducer(Pointer<kernels::ActiveMQProducerKernel> producer);
143 
149  virtual void removeProducer(const Pointer<commands::ProducerId>& producerId);
150 
157  virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher);
158 
164  virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer);
165 
176  virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout);
177 
182  bool isClosed() const {
183  return this->closed.get();
184  }
185 
190  bool isStarted() const {
191  return this->started.get();
192  }
193 
198  bool isTransportFailed() const {
199  return this->transportFailed.get();
200  }
201 
220  virtual void destroyDestination(const commands::ActiveMQDestination* destination);
221 
240  virtual void destroyDestination(const cms::Destination* destination);
241 
252  bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
253 
262  void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
263 
270  void removeAuditedDispatcher(Dispatcher* dispatcher);
271 
272  public: // Connection Interface Methods
273 
277  virtual const cms::ConnectionMetaData* getMetaData() const {
278  return connectionMetaData.get();
279  }
280 
284  virtual cms::Session* createSession();
285 
289  virtual std::string getClientID() const;
290 
294  virtual void setClientID(const std::string& clientID);
295 
299  virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode);
300 
304  virtual void close();
305 
309  virtual void start();
310 
314  virtual void stop();
315 
319  virtual cms::ExceptionListener* getExceptionListener() const;
320 
324  virtual void setExceptionListener(cms::ExceptionListener* listener);
325 
329  virtual void setMessageTransformer(cms::MessageTransformer* transformer);
330 
334  virtual cms::MessageTransformer* getMessageTransformer() const;
335 
339  virtual cms::DestinationSource* getDestinationSource();
340 
341  public: // Configuration Options
342 
347  void setUsername(const std::string& username);
348 
354  const std::string& getUsername() const;
355 
360  void setPassword(const std::string& password);
361 
367  const std::string& getPassword() const;
368 
373  void setDefaultClientId(const std::string& clientId);
374 
380  void setBrokerURL(const std::string& brokerURL);
381 
387  const std::string& getBrokerURL() const;
388 
397  void setPrefetchPolicy(PrefetchPolicy* policy);
398 
404  PrefetchPolicy* getPrefetchPolicy() const;
405 
414  void setRedeliveryPolicy(RedeliveryPolicy* policy);
415 
421  RedeliveryPolicy* getRedeliveryPolicy() const;
422 
426  bool isDispatchAsync() const;
427 
436  void setDispatchAsync(bool value);
437 
443  bool isAlwaysSyncSend() const;
444 
450  void setAlwaysSyncSend(bool value);
451 
456  bool isUseAsyncSend() const;
457 
462  void setUseAsyncSend(bool value);
463 
468  bool isUseCompression() const;
469 
476  void setUseCompression(bool value);
477 
487  void setCompressionLevel(int value);
488 
494  int getCompressionLevel() const;
495 
500  unsigned int getSendTimeout() const;
501 
507  void setSendTimeout(unsigned int timeout);
508 
513  unsigned int getConnectResponseTimeout() const;
514 
521  void setConnectResponseTimeout(unsigned int connectResponseTimeout);
522 
527  unsigned int getCloseTimeout() const;
528 
533  void setCloseTimeout(unsigned int timeout);
534 
542  unsigned int getProducerWindowSize() const;
543 
550  void setProducerWindowSize(unsigned int windowSize);
551 
556  bool isMessagePrioritySupported() const;
557 
565  void setMessagePrioritySupported(bool value);
566 
571  long long getNextTempDestinationId();
572 
577  long long getNextLocalTransactionId();
578 
585  bool isWatchTopicAdvisories() const;
586 
594  void setWatchTopicAdvisories(bool value);
595 
604  int getAuditDepth() const;
605 
615  void setAuditDepth(int auditDepth);
616 
622  int getAuditMaximumProducerNumber() const;
623 
630  void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
631 
644  bool isCheckForDuplicates() const;
645 
659  void setCheckForDuplicates(bool checkForDuplicates);
660 
668  bool isTransactedIndividualAck() const;
669 
678  void setTransactedIndividualAck(bool transactedIndividualAck);
679 
686  bool isNonBlockingRedelivery() const;
687 
696  void setNonBlockingRedelivery(bool nonBlockingRedelivery);
697 
703  long long getConsumerFailoverRedeliveryWaitPeriod() const;
704 
711  void setConsumerFailoverRedeliveryWaitPeriod(long long value);
712 
716  bool isOptimizeAcknowledge() const;
717 
724  void setOptimizeAcknowledge(bool optimizeAcknowledge);
725 
731  long long getOptimizeAcknowledgeTimeOut() const;
732 
739  void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut);
740 
749  long long getOptimizedAckScheduledAckInterval() const;
750 
760  void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
761 
767  bool isUseRetroactiveConsumer() const;
768 
777  void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
778 
784  bool isExclusiveConsumer() const;
785 
793  void setExclusiveConsumer(bool exclusiveConsumer);
794 
801  bool isSendAcksAsync() const;
802 
810  void setSendAcksAsync(bool sendAcksAsync);
811 
815  bool isAlwaysSessionAsync() const;
816 
823  void setAlwaysSessionAsync(bool alwaysSessionAsync);
824 
828  bool isConsumerExpiryCheckEnabled();
829 
837  void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
838 
842  int getProtocolVersion() const;
843 
844  public: // TransportListener
845 
857  void addTransportListener(transport::TransportListener* transportListener);
858 
867  void removeTransportListener(transport::TransportListener* transportListener);
868 
874  virtual void onCommand(const Pointer<commands::Command> command);
875 
880  virtual void onException(const decaf::lang::Exception& ex);
881 
885  virtual void transportInterrupted();
886 
890  virtual void transportResumed();
891 
892  public:
893 
900  const commands::ConnectionInfo& getConnectionInfo() const;
901 
908  const commands::ConnectionId& getConnectionId() const;
909 
915  transport::Transport& getTransport() const;
916 
922  Pointer<threads::Scheduler> getScheduler() const;
923 
930  std::string getResourceManagerId() const;
931 
936  void cleanup();
937 
948  void oneway(Pointer<commands::Command> command);
949 
964  Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
965 
978  void asyncRequest(Pointer<commands::Command> command, cms::AsyncCallback* onComplete);
979 
984  virtual void fire(const exceptions::ActiveMQException& ex);
985 
990  void setTransportInterruptionProcessingComplete();
991 
999  void setFirstFailureError(decaf::lang::Exception* error);
1000 
1006  decaf::lang::Exception* getFirstFailureError() const;
1007 
1014  void onAsyncException(const decaf::lang::Exception& ex);
1015 
1022  void onClientInternalException(const decaf::lang::Exception& ex);
1023 
1029  void checkClosed() const;
1030 
1036  void checkClosedOrFailed() const;
1037 
1041  void ensureConnectionInfoSent();
1042 
1046  decaf::util::concurrent::ExecutorService* getExecutor() const;
1047 
1055  void addTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1056 
1064  void removeTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1065 
1075  void deleteTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
1076 
1083  void cleanUpTempDestinations();
1084 
1091  bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const;
1092 
1100 
1101  protected:
1102 
1106  virtual Pointer<commands::SessionId> getNextSessionId();
1107 
1108  // Sends a oneway disconnect message to the broker.
1109  void disconnect(long long lastDeliveredSequenceId);
1110 
1111  // Waits for all Consumers to handle the Transport Interrupted event.
1112  void waitForTransportInterruptionProcessingToComplete();
1113 
1114  // Marks processing complete for a single caller when interruption processing completes.
1115  void signalInterruptionProcessingComplete();
1116 
1117  // Allow subclasses to access the original Properties object for this connection.
1118  const decaf::util::Properties& getProperties() const;
1119 
1120  // Process the WireFormatInfo command
1121  void onWireFormatInfo(Pointer<commands::Command> command);
1122 
1123  // Process the ControlCommand command
1124  void onControlCommand(Pointer<commands::Command> command);
1125 
1126  // Process the ConnectionControl command
1127  void onConnectionControl(Pointer<commands::Command> command);
1128 
1129  // Process the ConsumerControl command
1130  void onConsumerControl(Pointer<commands::Command> command);
1131 
1132  };
1133 
1134 }}
1135 
1136 #endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_*/
Provides an interface for clients to transform cms::Message objects inside the CMS MessageProducer an...
Definition: MessageTransformer.h:37
Interface for a RedeliveryPolicy object that controls how message Redelivery is handled in ActiveMQ-C...
Definition: RedeliveryPolicy.h:34
Definition: ConnectionInfo.h:49
A Destination object encapsulates a provider-specific address.
Definition: Destination.h:39
#define AMQCPP_API
Definition: Config.h:30
AcknowledgeMode
Definition: Session.h:108
Asynchronous event interface for CMS asynchronous operations.
Definition: AsyncCallback.h:37
bool isStarted() const
Check if this connection has been started.
Definition: ActiveMQConnection.h:190
Definition: ConnectionId.h:51
An Executor that provides methods to manage termination and methods that can produce a Future for tra...
Definition: ExecutorService.h:56
If a CMS provider detects a serious problem, it notifies the client application through an ExceptionL...
Definition: ExceptionListener.h:37
bool isClosed() const
Checks if this connection has been closed.
Definition: ActiveMQConnection.h:182
bool isTransportFailed() const
Checks if the Connection's Transport has failed.
Definition: ActiveMQConnection.h:198
A ConnectionMetaData object provides information describing the Connection object.
Definition: ConnectionMetaData.h:31
Interface for a transport layer for command objects.
Definition: Transport.h:60
A Session object is a single-threaded context for producing and consuming messages.
Definition: Session.h:105
Definition: ConsumerInfo.h:51
A boolean value that may be updated atomically.
Definition: AtomicBoolean.h:34
Interface for a Policy object that controls message Prefetching on various destination types in Activ...
Definition: PrefetchPolicy.h:34
Definition: ActiveMQException.h:35
virtual const cms::ConnectionMetaData * getMetaData() const
Gets the metadata for this connection.the connection MetaData pointer ( caller does not own it )...
Definition: ActiveMQConnection.h:277
Definition: ArrayList.h:39
Definition: Exception.h:38
Java-like properties class for mapping string names to string values.
Definition: Properties.h:53
An enhanced CMS Connection instance that provides additional features above the default required feat...
Definition: EnhancedConnection.h:33
Interface for an object responsible for dispatching messages to consumers.
Definition: Dispatcher.h:32
Decaf's implementation of a Smart Pointer that is a template on a Type and is Thread Safe if the defa...
Definition: Pointer.h:53
Provides an object that will provide a snapshot view of Destinations that exist on the Message provid...
Definition: DestinationSource.h:38
Concrete connection used for all connectors to the ActiveMQ broker.
Definition: ActiveMQConnection.h:61
A listener of asynchronous exceptions from a command transport object.
Definition: TransportListener.h:38
Definition: ActiveMQDestination.h:38