Flume 1.6.0 spooling directory source with timestamp on header
我正在尝试创建一个像 source spooldir 这样的新水槽代理并将它们放入 HDFS。这是我的配置文件:
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
agent.sources = file
agent.channels = channel agent.sinks = hdfsSink # SOURCES CONFIGURATION # SINKS CONFIGURATION agent.sinks.hdfsSink.hdfs.filePrefix = common # CHANNELS CONFIGURATION |
我收到一个描述 Expected timestamp in the Flume event headers, but it was null 的错误。我正在阅读的文件包含 JSON 结构,其中有一个名为 timestamp.
的字段
有没有办法在标题中添加这个时间戳?
- 让我尝试并肯定返回结果,但这需要几个小时,好的!
如本文所述:
http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/
所需的更改是在其中包含一个拦截器和序列化程序:
1
2 3 4 5 6 7 8 9 10 11 |
# SOURCES CONFIGURATION
agent.sources.file.type = spooldir agent.sources.file.channels = channel agent.sources.file.spoolDir = /path/to/json_files agent.sources.file.interceptors = i1 agent.sources.file.interceptors.i1.type = regex_extractor agent.sources.file.interceptors.i1.regex = <regex_for_timestamp> agent.sources.file.interceptors.i1.serializers = s1 agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer agent.sources.file.interceptors.i1.serializers.s1.name = timestamp agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex> |
感谢您指出,除了链接之外,我还需要包含一个适当的片段:)
根据我之前的评论,现在我将分享我为 spooling header enable json file 遵循和执行的整个步骤,使用 flume 将其放入 hadoop hdfs 集群,在 json 文件上创建一个外部文件,然后执行DML query 在它上面 –
已创建 flume-spool.conf
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
//Flume Configuration Starts
erum.sources =source-1 erum.channels =file-channel-1 erum.sinks =hdfs-sink-1 erum.sources.source-1.channels =file-channel-1 //Define a file channel called fileChannel on erum erum.channels.file-channel-1.capacity =2000000 //Define a source for erum //Spooldir in my case is /home/arif/practice/flume_sink //Sink is flume_import under hdfs erum.sinks.hdfs-sink-1.hdfs.filePrefix =common erum.sinks.hdfs-sink-1.hdfs.batchSize =1000 |
现在我们使用代理运行flume-spool – erum
1
|
bin/flume-ng agent -n erum -c conf -f conf/flume-spool.conf -Dflume.root.logger=DEBUG,console
|
复制erum.sources.source-1.spoolDir flume配置的指定目录内的products.json文件。
products.json 文件内的内容原样如下 –
1
2 3 4 |
{“productid”:”5968dd23fc13ae04d9000001″,”product_name”:”sildenafilcitrate”,”mfgdate”:”20160719031109″,”supplier”:”WisozkInc”,”quantity”:261,”unit_cost”:”$10.47″}
{“productid”:”5968dd23fc13ae04d9000002″,”product_name”:”MountainJuniperusashei”,”mfgdate”:”20161003021009″,”supplier”:”Keebler-Hilpert”,”quantity”:292,”unit_cost”:”$8.74″} {“productid”:”5968dd23fc13ae04d9000003″,”product_name”:”DextromathorphanHBr”,”mfgdate”:”20161101041113″,”supplier”:”Schmitt-Weissnat”,”quantity”:211,”unit_cost”:”$20.53″} {“productid”:”5968dd23fc13ae04d9000004″,”product_name”:”MeophanHBr”,”mfgdate”:”20161101061113″,”supplier”:”Schmitt-Weissnat”,”quantity”:198,”unit_cost”:”$18.73″} |
从下面的 url-
下载 hive-serdes-sources-1.0.6.jar
1
|
https://www.dropbox.com/s/lsjgk2zaqz8uli9/hive-serdes-sources-1.0.6.jar?dl=0
|
使用flume-spool将json文件spool到hdfs集群后,我们将启动hive服务器,登录hive shell,然后执行以下操作-
1
2 3 4 5 6 7 8 9 10 11 12 |
hive> add jar /home/arif/applications/hadoop/apache-hive-2.1.1-bin/lib/hive-serdes-sources-1.0.6.jar;
hive> create external table products (productid string, product_name string, mfgdate string, supplier string, quantity int, unit_cost string) > row format serde ‘com.cloudera.hive.serde.JSONSerDe’ location ‘/user/arif/flume_sink/products/’; OK Time taken: 0.211 seconds hive> select * from products; OK 5968dd23fc13ae04d9000001 sildenafilcitrate 20160719031109 WisozkInc 261 $10.47 5968dd23fc13ae04d9000002 MountainJuniperusashei 20161003021009 Keebler-Hilpert 292 $8.74 5968dd23fc13ae04d9000003 DextromathorphanHBr 20161101041113 Schmitt-Weissnat 211 $20.53 5968dd23fc13ae04d9000004 MeophanHBr 20161101061113 Schmitt-Weissnat 198 $18.73 Time taken: 0.291 seconds, Fetched: 4 row(s) |
我已经完成了所有这些步骤,没有任何错误,希望这对你有帮助,谢谢。
- 您好 arifmustafa,感谢您的回复,您的回答的问题是 hadoop 目录是静态的,并且不使用标头中的时间戳,这就是为什么不是我的问题的合适解决方案
来源:https://www.codenong.com/48110507/