准备工作

  • 下载源码git clone https://github.com/apache/activemq.git
  • 使用maven编译源码并且下载依赖mvn clean install
  • 导入到开发工具eclipsemvn eclipse:eclipse 或者idea mvn idea:idea
  • 默认会去maven中央仓库下载jar包,如果下载速度慢可以翻墙或者改成开源中国的镜像仓库 xml <mirrors> <mirror> <id>CN</id> <name>OSChina Central</name> <url>http://maven.oschina.net/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> </mirrors>

遇到的问题

  • idea导入之后无任何显示,事实证明楼主太着急,等编译之后就有了。。

客户端启动

  • java客户端代码我是按照《Java消息服务》这本书中的例子完成的
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  public class Chat implements javax.jms.MessageListener{

	private TopicSession pubSession;
	private TopicPublisher publisher;
	private TopicConnection connection;
	private String username;

	/**
	 * @param topicFactory
	 * @param topicName
	 * @param username
	 * @throws Exception
	 */
	public Chat(String topicFactory,String topicName,String username) throws Exception {
		//使用JNDI?
		InitialContext ctx = new InitialContext();
		//拿到工场
		TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory);
		//创建连接
		TopicConnection connection = conFactory.createTopicConnection();

		TopicSession pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		TopicSession subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

		Topic chatTopic = (Topic)ctx.lookup(topicName);

		TopicPublisher publisher = pubSession.createPublisher(chatTopic);
		String selector = "username=zhishuo";
		TopicSubscriber subscriber = subSession.createSubscriber(chatTopic,selector,true);

		subscriber.setMessageListener(this);

		this.connection = connection;
		this.pubSession = pubSession;
		this.publisher = publisher;
		this.username = username;
		connection.start();

	}


	public void onMessage(Message message) {
		try {
			TextMessage textMessage = (TextMessage)message;
			System.out.println(textMessage.getText());
			System.out.println(message.getJMSDestination());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 发送消息
	 * @param text
	 * @throws JMSException
	 */
	protected void writeMessage(String text) throws JMSException {
		TextMessage textMessage = pubSession.createTextMessage(username+":"+text);
		publisher.publish(textMessage);
	}

	public void close() throws JMSException {
		connection.close();
	}

	public static void main(String[] args) {
		try {
			if(args.length!=3){
				System.out.println("something is missing");
			}
			// topicFactory, topicName, username
			Chat chat = new Chat(args[0],args[1],args[2]);
			BufferedReader commondLine = new BufferedReader(new InputStreamReader(System.in));

			while(true){
				String s = commondLine.readLine();
				if(s.equalsIgnoreCase("exit")){
					chat.clone();
					System.exit(0);
				}else{
					chat.writeMessage(s);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
  • 还需要在classpath中加入jndi.properties

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    #配置实现工场
    java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
    #配置activemq地址
    java.naming.provider.url=tcp://localhost:61616
    #这里是配置的安全机制?
    java.naming.security.principal=system
    java.naming.security.credentials=manager
    #订阅主题的名称,可以以逗号,隔开
    connectionFactoryNames=TopicCF
    #topic
    topic.topic1=jms.topic1
  • 启动Chart类main方法时,传入TopicCF topic1 zhishuo这样客户端就可以正常启动, 并且发送消息了,如果启动多个客户端,然后按回车发送消息,所有的人都可以看到。

代码分析

  • InitialContext ctx = new InitialContext(); 这里是JMS给出的公用调用方法,new对象时调用了默认的init()方法。 myProps = (Hashtable<Object,Object>) ResourceManager.getInitialEnvironment(environment);里面主要是去加载当前环境变量 和一些JVM设置的环境变量参数。
  • 主要初始化都在getDefaultInitCtx()--->NamingManager.getInitialContext(myProps); 中完成,会在环境变量配置参数中找到String INITIAL_CONTEXT_FACTORY = "java.naming.factory.initial"; key值,由于我们这里配置的是java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory,所以实例化的也就是ActiveMQInitialcontextFactory,最后再调用ActiveMQInitialcontextFactory.getInitialContext(env)这里传过去的参数是环境变量。
  • 我们来到ActiveMQInitialcontextFactory.getInitialContext(env)方法中,此方法便是真正用来初始化队列, 主题

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    
    public Context getInitialContext(Hashtable environment) throws NamingException {
       // lets create a factory
       Map<String, Object> data = new ConcurrentHashMap<String, Object>();
       String[] names = getConnectionFactoryNames(environment);
       for (int i = 0; i < names.length; i++) {
           ActiveMQConnectionFactory factory = null;
           String name = names[i];
    
           try {
               factory = createConnectionFactory(name, environment);
           } catch (Exception e) {
               throw new NamingException("Invalid broker URL");
    
           }
           /*
            * if( broker==null ) { try { broker = factory.getEmbeddedBroker(); }
            * catch (JMSException e) { log.warn("Failed to get embedded
            * broker", e); } }
            */
           data.put(name, factory); // 实际上是 tocipIF ActiveMQConnectionFactory
       }
    
       createQueues(data, environment);
       createTopics(data, environment);
       /*
        * if (broker != null) { data.put("destinations",
        * broker.getDestinationContext(environment)); }
        */
       data.put("dynamicQueues", new LazyCreateContext() {
           private static final long serialVersionUID = 6503881346214855588L;
    
           @Override
           protected Object createEntry(String name) {
               return new ActiveMQQueue(name);
           }
       });
       data.put("dynamicTopics", new LazyCreateContext() {
           private static final long serialVersionUID = 2019166796234979615L;
    
           @Override
           protected Object createEntry(String name) {
               return new ActiveMQTopic(name);
           }
       });
    
       return createContext(environment, data);
    }

getConnectionFactoryNames方法中,是为了遍历并且初始化connectionFactoryNames配置的Value值,我们这里配置的connectionFactoryNames=TopicCF,这里用到了StringTokenizer感觉很好用,平时自己都是用string的split,没想到还有这种用法。 然后遍历刚才解析的Value值,并且最终创建ActiveMQConnectionFactory对象返回。 把创建好的对像放入Hashmap中,map.put(TopicCF,ActiveMQConnectionFactory), > failover://tcp://localhost:61616 这是此对象初始化时url属性的默认值 初始化的时候修改成了tcp://localhost:61616

这里是每一个value值都对应一个自己的工厂类。

接下来创建Queues和Topics查找配置文件中以queue.和topic.开头的,并且生成ActiveMQQueue对象。 我们这里配置的jms.topic1最后也放入map.put(topic1,ActiveMQTopic)。

接下来创建了动态的队列和主题,不知何用。 dynamicQueues dynamicTopics 并且都放入了map 最后创建了ReadOnlyContext对象,并且把相关的数据绑定都传入其中。 至此第一句代码初始化完成。

  • (TopicConnectionFactory)ctx.lookup(topicFactory)这里是去加载实现工厂的类,由于上一个初始化最终返回的是 ReadOnlyContext对象,所以这里调用的是ReadOnlyContext.lookup方法,我们来看public Object lookup(String name) throws NamingException {方法,这里其实就是根据传入的名称,取出相应的工厂,由于我们在启动时传入的工厂类名是 TopicCF 所以,这里就是取出刚才放入的ActiveMQConnectionFactory工厂类,类图如下 类图

  • conFactory.createTopicConnection(),这里是调用的ActiveMQConnectionFactory中的方法,该类中初始化了传输协议模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    if (scheme.equals("auto")) {
     connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
    } else if (scheme.equals("auto+ssl")) {
     connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
    } else if (scheme.equals("auto+nio")) {
     connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
    } else if (scheme.equals("auto+nio+ssl")) {
     connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
    }

    如果以上都没有配置,默认使用TCP协议传输,在TransportFactory.connect()-->findTransportFactory()中,查找资源对应的配置协议,如果没有查到,使用默认的在 private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");此文件目录中,因为这里是用的tcp://localhost:61616所以这里使用的就是META-INF/services/org/apache/activemq/transport/tcp文件,此文件中实际配置的是TCP的传输器

    1
    
    class=org.apache.activemq.transport.tcp.TcpTransportFactory

    然后调TcpTransportFactory.doConnect()进行连接,至此传输器创建完成。

  • 接下来创建ActiveMQConnection连接

  • 最后启动连接,并且设置一些默认值,至此创建连接完成,并返回连接。

  • TopicSession pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);创建发布和订阅Session, 最终调用的是ActiveMQConnection.createTopicSession(),创建ActiveMQSession对像并返回。

  • Topic chatTopic = (Topic)ctx.lookup(topicName);加载主题,这里的值为topic1, 因为在前边初始化的时候已经放入了对像,所以这里的值为ActiveMQTopic,调用ActiveMQSession.createPublisher() 和 createSubscriber() 创建发布者ActiveMQTopicPublisher和消费者ActiveMQTopicSubscriber。

  • 创建完成之后,把自己设置为订阅者的监听器,最终启动连接,客户端启动代码分析完成,因为有一些自己也不是很理解,所以未能表达。

服务端启动

  • 以下是服务端启动的类图调用关系: 启动类图

这里还了解了java -D 是可以把一些参数设置到JVM中的

服务端启动这里充分体现了命令设计模式的案例,我认为是很好的学习例子,最终调用的是StartCommand.runTask()方法,类中有如下几个重要功能 - 初始化broker - 添加JVM shutdownhook 停止时进行清理

下面来看broker = BrokerFactory.createBroker(configURI);此类中在创建工厂时也使用了文件配置的方法由于默认是使用的xbean:activemq.xml资源文件,所以在创建工厂时就是实例化的 META-INF/services/org/apache/activemq/broker/xbean文件中配置的类,

1
class=org.apache.activemq.xbean.XBeanBrokerFactory

实际调用的是XBeanBrokerFactory.createBroker()此类中创建了Spring的ResourceXmlApplicationContext实现对象,但不知这里为何要这样。这里最后还调用了spring框架中refresh方法,此方法会初始化spring整个框架。 最终创建一个BrokerService返回,broker中包含了所有上下文环境。

KahaDBPersistenceAdapter 调用关系图 调用关系

KahaDBPersistenceAdapter 用例图

调用关系 最后启动,至此服务启动完成。

问题