学习 Druid(三):Kafka 数据摄入

Druid 支持两种 Kafka 数据摄入方式:

  • Push 通过 Tranquility;
  • Pull 通过 Kafka Indexing Service。

Tranquility

TODO

Kafka Indexing Service

1. 编辑配置文件

编辑 Overload 和 MiddleManager 的 conf/druid/cluster/_common/common.runtime.properties 配置文件。

加载 Kafka Indexing Service 扩展:

druid.extensions.loadList=["mysql-metadata-storage"]  

2. 编写 Supervisor 说明文件

官网🌰:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "metrics-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [],
          "dimensionExclusions": [
            "timestamp",
            "value"
          ]
        }
      }
    },
    "metricsSpec": [
      {
        "name": "count",
        "type": "count"
      },
      {
        "name": "value_sum",
        "fieldName": "value",
        "type": "doubleSum"
      },
      {
        "name": "value_min",
        "fieldName": "value",
        "type": "doubleMin"
      },
      {
        "name": "value_max",
        "fieldName": "value",
        "type": "doubleMax"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE"
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000
  },
  "ioConfig": {
    "topic": "metrics",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H"
  }
}

Supervisor 配置:

  • type kafka,必填;
  • dataSchema 指定摄入数据的 schema,必填;
  • ioConfig Supervisor 和 Indexer 任务配置,必填;
  • tuningConfig Supervisor 和 Indexer 任务调优配置,选填。
2.1 DataSchema

Druid 的 Datasource 可以作为关系型数据库中的 Table。

Druid 的 Schema 符合时序库的数据结构,包括:

  • 时间戳(Timestamp) 描述事件发生的时间;
  • 维度(Dimensions) 描述事件的属性;
  • 指标(Metrics) 描述事件的值;

Segment

数据在 Datasource 中按时间戳进行分区,每一个分区被称为 Chunk,Chunk 中每一个文件被称为 Segment。

Chunk

2.2 IOConfig

配置项 topic,Kafka Topic,必填;

配置项 consumerProperties,Kafka Consumer 配置项,必填;

配置项 replicas,副本集数;

配置项 taskCount,每个副本集任务数。

2.3 TuningConfig

略。

3. 提交 Supervisor

Druid 提供两种方法提交 Supervisor:

  1. UI,在 Tasks 菜单,点击 Submit Supervisor 按钮;
  2. REST APIcurl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor

Trouble Shooting

索引 Task 报错 Unrecognized VM option 'ExitOnOutOfMemoryError'

错误日志:

Unrecognized VM option 'ExitOnOutOfMemoryError'  
Did you mean 'OnOutOfMemoryError=<value>'?  
Error: Could not create the Java Virtual Machine.  
Error: A fatal exception has occurred. Program will exit.  

解决办法:

导致该问题的原因是主机 JVM 不支持 ExitOnOutOfMemoryError 参数。

第一种方法,升级至 JDK 1.8 8u92 以上版本(推荐!!!)。

第二种方法,编辑 conf/druid/cluster/data/middleManager/runtime.properties 文件,找到 druid.indexer.runner.javaOpts 配置项:

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager  

移除其中的 -XX:+ExitOnOutOfMemoryError JVM 配置项。

参考