学习 Druid(四):HDFS 数据摄入

由于业务调整,需要批量修正历史数据。

或者,在 Lambda 架构 中,使用 Druid 作为 Serving 层,使用 Flink 作为 Speed 层,使用 Spark 作为 Batch 层。

Druid 提供了两种批量数据摄入的方式:

  • 原生批量摄入,适用于小批量和大批量数据索引;
  • 基于 Hadoop 批量摄入,适用于海量数据索引;

原生批量摄入

Druid 提供了单任务的 Simple 任务(类型为 index)和并发多任务的 Parallel 任务(类型为 index_parallel)两种原生数据摄入方式。

Simple 任务用于小批量数据索引。

Parallel 任务用于大批量数据索引,启动一个 Supervisor 任务,由 Supervisor 生成多个子任务。

原生批量摄入支持以下 Filehose:

  • LocalFirehose 本地文件;
  • HttpFirehose 通过 HTTP 访问的远程文件;
  • IngestSegmentFirehose 已存在的 Segment 文件;
  • SqlFirehose 通过 SQL 查询数据库数据,支持 MySQL 和 PostgreSQL;
  • InlineFirehose 内联数据,仅供调试和测试;
  • CombiningFirehose 合并多个 Filehose 资源。

👇,介绍如何启动 Parallel 任务通过 HttpFirehose 摄入 HDFS 数据。

1. 开启 WebHDFS(可选)

WebHDFS 是 Hadoop 提供的用于操作 HDFS 的 REST API。

编辑 hdfs-site.xml 文件,启动 WebHDFS:

<property>  
  <name>dfs.webhdfs.enabled</name>
  <value>true</value>
</property>  

重启 HDFS 集群。

2. 启动 Parallel 任务

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "test",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": [
              "name"
            ]
          }
        }
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "segmentGranularity": "DAY"
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "firehose" : {
        "type" : "http",
        "uris" : [] 
       }
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxNumConcurrentSubTasks" : 2,
      "forceGuaranteedRollup" : false
    }
  }
}

Task 配置:

  • type index_parallel 必填;
  • dataSchema 指定摄入数据的 schema,必填;
  • ioConfig Supervisor 和 Indexer 任务配置,必填;
  • tuningConfig Supervisor 和 Indexer 任务调优配置,选填。
2.1 ioConfig

配置项 firhose.type,http 使用 HttpFirehose 摄入数据,必填;

配置项 firehose.uris,HTTP URI 数组,子任务会使用该 URI 通过 HTTP 加载远端数据。

2.2 tuningConfig

配置项 maxNumConcurrentSubTasks,同时启动的子任务最大并行数;

配置项 forceGuaranteedRollup,是否强制保证 perfect rollup。值为 false 只执行单个阶段进行索引,值为 true 执行带数据 shuffle 的两个阶段索引。

基于 Hadoop 批量摄入

TODO

参考