利用Apache Flume 读取JMS 消息队列消息。并将消息写入HDFS,flume agent配置例如以下:
flume-agent.conf
#name the components on this agent
agentHdfs.sources = jms_source agentHdfs.sinks = hdfs_sink agentHdfs.channels = mem_channel # Describe/configure the source agentHdfs.sources.jms_source.type = jms # Bind to all interfacesagentHdfs.sources.jms_source.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory agentHdfs.sources.jms_source.connectionFactory = ConnectionFactory agentHdfs.sources.jms_source.destinationName = BUSINESS_DATA #AMQ queue agentHdfs.sources.jms_source.providerURL = tcp://hadoop-master:61616 agentHdfs.sources.jms_source.destinationType = QUEUE # Describe the sinkagentHdfs.sinks.hdfs_sink.type = hdfs agentHdfs.sinks.hdfs_sink.hdfs.path hdfs://hadoop-master/data/flume/%Y-%m-%d/%H agentHdfs.sinks.hdfs_sink.hdfs.filePrefix = %{hostname}/events- agentHdfs.sinks.hdfs_sink.hdfs.maxOpenFiles = 5000 agentHdfs.sinks.hdfs_sink.hdfs.batchSize= 500 agentHdfs.sinks.hdfs_sink.hdfs.fileType = DataStream agentHdfs.sinks.hdfs_sink.hdfs.writeFormat =Text agentHdfs.sinks.hdfs_sink.hdfs.rollSize = 0 agentHdfs.sinks.hdfs_sink.hdfs.rollCount = 1000000 agentHdfs.sinks.hdfs_sink.hdfs.rollInterval = 600 agentHdfs.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agentHdfs.channels.mem_channel.type = memory agentHdfs.channels.mem_channel.capacity = 1000 agentHdfs.channels.mem_channel.transactionCapacity = 100 # Bind the source and sink to the channel agentHdfs.sources.jms_source.channels = mem_channel agentHdfs.sinks.hdfs_sink.channel = mem_channel