読者です 読者をやめる 読者になる 読者になる

そごうソフトウェア研究所

SOA、開発プロセス、ITアーキテクチャなどについて書いています。Twitterやってます@rsogo

MacOSXへのApache Kafkaのセットアップ

kafka.apache.org

0.10.0.1を入れます。

1. Zookeeperの起動

$ bin/zookeeper-server-start.sh config/zookeeper.properties 
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
[2016-10-09 22:17:15,900] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-10-09 22:17:15,903] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,905] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,906] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,906] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-10-09 22:17:15,930] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-10-09 22:17:15,930] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)

(中略)

[2016-10-09 22:17:15,991] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:15,991] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:15,991] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:16,015] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

2. Kafkaの起動

$ bin/kafka-server-start.sh config/server.properties 
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
[2016-10-09 22:18:41,676] INFO KafkaConfig values: 
    advertised.host.name = null
    metric.reporters = []
    quota.producer.default = 9223372036854775807
    offsets.topic.num.partitions = 50
    log.flush.interval.messages = 9223372036854775807
    auto.create.topics.enable = true
    controller.socket.timeout.ms = 30000
    log.flush.interval.ms = null
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    replica.socket.receive.buffer.bytes = 65536
    min.insync.replicas = 1
    replica.fetch.wait.max.ms = 500
    num.recovery.threads.per.data.dir = 1
    ssl.keystore.type = JKS
    sasl.mechanism.inter.broker.protocol = GSSAPI
    default.replication.factor = 1
    ssl.truststore.password = null
    log.preallocate = false
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    fetch.purgatory.purge.interval.requests = 1000
    ssl.endpoint.identification.algorithm = null
    replica.socket.timeout.ms = 30000
    message.max.bytes = 1000012
    num.io.threads = 8
    offsets.commit.required.acks = -1
    log.flush.offset.checkpoint.interval.ms = 60000
    delete.topic.enable = false
    quota.window.size.seconds = 1
    ssl.truststore.type = JKS
    offsets.commit.timeout.ms = 5000
    quota.window.num = 11
    zookeeper.connect = localhost:2181
    authorizer.class.name = 
    num.replica.fetchers = 1
    log.retention.ms = null
    log.roll.jitter.hours = 0
    log.cleaner.enable = true
    offsets.load.buffer.size = 5242880
    log.cleaner.delete.retention.ms = 86400000
    ssl.client.auth = none
    controlled.shutdown.max.retries = 3
    queued.max.requests = 500
    offsets.topic.replication.factor = 3
    log.cleaner.threads = 1
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    socket.request.max.bytes = 104857600
    ssl.trustmanager.algorithm = PKIX
    zookeeper.session.timeout.ms = 6000
    log.retention.bytes = -1
    log.message.timestamp.type = CreateTime
    sasl.kerberos.min.time.before.relogin = 60000
    zookeeper.set.acl = false
    connections.max.idle.ms = 600000
    offsets.retention.minutes = 1440
    replica.fetch.backoff.ms = 1000
    inter.broker.protocol.version = 0.10.0-IV1
    log.retention.hours = 168
    num.partitions = 1
    broker.id.generation.enable = true
    listeners = null
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    log.roll.ms = null
    log.flush.scheduler.interval.ms = 9223372036854775807
    ssl.cipher.suites = null
    log.index.size.max.bytes = 10485760
    ssl.keymanager.algorithm = SunX509
    security.inter.broker.protocol = PLAINTEXT
    replica.fetch.max.bytes = 1048576
    advertised.port = null
    log.cleaner.dedupe.buffer.size = 134217728
    replica.high.watermark.checkpoint.interval.ms = 5000
    log.cleaner.io.buffer.size = 524288
    sasl.kerberos.ticket.renew.window.factor = 0.8
    zookeeper.connection.timeout.ms = 6000
    controlled.shutdown.retry.backoff.ms = 5000
    log.roll.hours = 168
    log.cleanup.policy = delete
    host.name = 
    log.roll.jitter.ms = null
    max.connections.per.ip = 2147483647
    offsets.topic.segment.bytes = 104857600
    background.threads = 10
    quota.consumer.default = 9223372036854775807
    request.timeout.ms = 30000
    log.message.format.version = 0.10.0-IV1
    log.index.interval.bytes = 4096
    log.dir = /tmp/kafka-logs
    log.segment.bytes = 1073741824
    log.cleaner.backoff.ms = 15000
    offset.metadata.max.bytes = 4096
    ssl.truststore.location = null
    group.max.session.timeout.ms = 300000
    ssl.keystore.password = null
    zookeeper.sync.time.ms = 2000
    port = 9092
    log.retention.minutes = null
    log.segment.delete.delay.ms = 60000
    log.dirs = /tmp/kafka-logs
    controlled.shutdown.enable = true
    compression.type = producer
    max.connections.per.ip.overrides = 
    log.message.timestamp.difference.max.ms = 9223372036854775807
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    auto.leader.rebalance.enable = true
    leader.imbalance.check.interval.seconds = 300
    log.cleaner.min.cleanable.ratio = 0.5
    replica.lag.time.max.ms = 10000
    num.network.threads = 3
    ssl.key.password = null
    reserved.broker.max.id = 1000
    metrics.num.samples = 2
    socket.send.buffer.bytes = 102400
    ssl.protocol = TLS
    socket.receive.buffer.bytes = 102400
    ssl.keystore.location = null
    replica.fetch.min.bytes = 1
    broker.rack = null
    unclean.leader.election.enable = true
    sasl.enabled.mechanisms = [GSSAPI]
    group.min.session.timeout.ms = 6000
    log.cleaner.io.buffer.load.factor = 0.9
    offsets.retention.check.interval.ms = 600000
    producer.purgatory.purge.interval.requests = 1000
    metrics.sample.window.ms = 30000
    broker.id = 0
    offsets.topic.compression.codec = 0
    log.retention.check.interval.ms = 300000
    advertised.listeners = null
    leader.imbalance.per.broker.percentage = 10
 (kafka.server.KafkaConfig)
[2016-10-09 22:18:41,741] INFO starting (kafka.server.KafkaServer)
[2016-10-09 22:18:41,748] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2016-10-09 22:18:41,781] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)

(略)

[2016-10-09 22:18:41,804] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@3e2e18f2 (org.apache.zookeeper.ZooKeeper)
[2016-10-09 22:18:41,821] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2016-10-09 22:18:41,825] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:41,897] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:42,091] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x157a9971f8c0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:42,093] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-10-09 22:18:42,198] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2016-10-09 22:18:42,224] INFO Loading logs. (kafka.log.LogManager)
[2016-10-09 22:18:42,233] INFO Logs loading complete. (kafka.log.LogManager)
[2016-10-09 22:18:42,436] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-10-09 22:18:42,441] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-10-09 22:18:42,457] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-10-09 22:18:42,509] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-10-09 22:18:42,512] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2016-10-09 22:18:42,552] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,553] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,638] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,648] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,649] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-10-09 22:18:42,737] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,739] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,793] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-10-09 22:18:42,801] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-10-09 22:18:42,802] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-10-09 22:18:42,880] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-10-09 22:18:42,889] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-10-09 22:18:42,890] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-10-09 22:18:42,901] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-10-09 22:18:42,945] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,949] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,955] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.11.5,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-10-09 22:18:42,956] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-10-09 22:18:43,046] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-10-09 22:18:43,046] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)
[2016-10-09 22:18:43,047] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-10-09 22:19:43,240] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [page_visit,0] (kafka.server.ReplicaFetcherManager)
[2016-10-09 22:19:43,295] INFO Completed load of log page_visit-0 with log end offset 0 (kafka.log.Log)
[2016-10-09 22:19:43,298] INFO Created log for partition [page_visit,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2016-10-09 22:19:43,299] INFO Partition [page_visit,0] on broker 0: No checkpointed highwatermark is found for partition [page_visit,0] (kafka.cluster.Partition)
[2016-10-09 22:28:42,798] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

3. Topicの作成

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visit
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "page_visit".

MacOSXへのActiveMQのセットアップ

MQTTを試したいので、ActiveMQをローカルに入れたいと思います。

1. モジュールの入手

http://activemq.apache.org/ 今は5.14.1 が最新みたいです。

Windows Distributionと、Unix/Linux/Cygwin Distributionがあるので、Unix/Linux/Cygwin Distributionの方をダウンロードして、解凍します。

2. 起動

解凍した場所をACTIVEMQ_HOMEとします。$ACTIVEMQ_HOME/bin配下のactivemq startで起動できます。

$ bin/activemq start
INFO: Loading '/Users/rsogo/work/apppot-iot/apache-activemq-5.14.1//bin/env'
INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/Users/rsogo/work/apppot-iot/apache-activemq-5.14.1//data/activemq.pid' (pid '48834')

ログは$ACTIVEMQ_HOME/data/activemq.logにでているっぽい。

$ tail -f data/activemq.log 
2016-10-09 21:35:59,708 | INFO  | Apache ActiveMQ 5.14.1 (localhost, ID:******) started | org.apache.activemq.broker.BrokerService | main
2016-10-09 21:35:59,710 | INFO  | For help or more information please see: http://activemq.apache.org | org.apache.activemq.broker.BrokerService | main
2016-10-09 21:35:59,723 | WARN  | Store limit is 102400 mb (current store usage is 0 mb). The data directory: /Users/rsogo/work/apppot-iot/apache-activemq-5.14.1/data/kahadb only has 24747 mb of usable space. - resetting to maximum available disk space: 24747 mb | org.apache.activemq.broker.BrokerService | main
2016-10-09 21:35:59,726 | WARN  | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: /Users/rsogo/work/apppot-iot/apache-activemq-5.14.1/data only has 24747 mb of usable space. - resetting to maximum available disk space: 24747 mb | org.apache.activemq.broker.BrokerService | main
2016-10-09 21:36:00,382 | INFO  | No Spring WebApplicationInitializer types detected on classpath | /admin | main
2016-10-09 21:36:00,546 | INFO  | ActiveMQ WebConsole available at http://0.0.0.0:8161/ | org.apache.activemq.web.WebConsoleStarter | main
2016-10-09 21:36:00,546 | INFO  | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/ | org.apache.activemq.web.WebConsoleStarter | main
2016-10-09 21:36:00,615 | INFO  | Initializing Spring FrameworkServlet 'dispatcher' | /admin | main
2016-10-09 21:36:00,891 | INFO  | No Spring WebApplicationInitializer types detected on classpath | /api | main
2016-10-09 21:36:00,973 | INFO  | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml | /api | main

3. 管理画面の確認

ログにでている通り、8161ポートでWebConsoleが動いています。

f:id:begirama:20161009214447p:plain

4. キューの作成

f:id:begirama:20161009215024p:plain

5. テストメッセージの送信

Destinationに作ったキューを指定して、Send。 f:id:begirama:20161009215404p:plain

Queuesメニューから先程作ったキューを選択すると、メッセージが入っていることを確認できます。

f:id:begirama:20161009215441p:plain

AppPot経由でHttpFSでREST化したHadoopとやり取りする

f:id:begirama:20160921004308p:plain

GetAnonymouseToken

  • GETメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/anonymousTokens?appKey=40c71254aca44664b61635573085ef1d&deviceUDID=bd393116-0b47-4b9e-a186-e64ffc0fbdf2
  • Response
{
  "status": "OK",
  "errCode": 0,
  "description": null,
  "results": "e3b82dd1cd964b76a2ee62c0ec97344f"
}

Login

  • POSTメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/auth/login
  • Request
{
  "username": "yamada",
  "password": "12345678@X",
  "appId": "apppot.TestApplication",
  "appVersion": "1.0.0",
  "deviceUDID": "bd393116-0b47-4b9e-a186-e64ffc0fbdf2",
  "isPush": "false",
  "companyId": 1
}
  • Response
{
  "status": "OK",
  "errCode": 0,
  "description": null,
  "apppotInfo": "AppPot Server 2.3.6 ",
  "authInfor": {
    "userTokens": "e0f08ddd14d3496d978afdb3530fe64f",
    "validTime": 1474388126575,
    "userId": 2,
    "userInfo": {
      "account": "yamada",
      "firstName": "太郎",
      "lastName": "山田"
    },
    "groupsAndRoles": [
      {
        "groupId": 1,
        "groupName": "開発グループ",
        "description": "",
        "roleId": 2,
        "roleName": "Super Admin"
      }
    ]
  }
}

ファイルの状態確認

  • GETメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge?user.name=rsogo&op=liststatus

HttpFSの検証で作ったファイルをAppPot経由で参照することができました。

  • Response
{
  "status": "OK",
  "errCode": 0,
  "description": "",
  "results": {
    "hoge": {
      "FileStatuses": {
        "FileStatus": [
          {
            "pathSuffix": "hoge",
            "type": "FILE",
            "length": 19,
            "owner": "rsogo",
            "group": "supergroup",
            "permission": "755",
            "accessTime": 1474384290147,
            "modificationTime": 1474384291294,
            "blockSize": 134217728,
            "replication": 1
          }
        ]
      }
    }
  }
}

ファイルの作成

  • apppot-token: Loginの結果得られたトークン
  • Content-Type: application/octet-stream
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge1?op=CREATE&data=true&user.name=rsogo&overwrite=true

POSTMANを使っていますが、ファイルの添付ができるので、Hadoopに投入したいファイルを選択します。 f:id:begirama:20160921011546p:plain

ファイルの読み込み

http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge1?op=open&user.name=rsogo
{
  "status": "OK",
  "errCode": 0,
  "description": "",
  "results": {
    "hoge1": "aaa,bbb"
  }
}

OK

HttpFSの検証

バイルアプリからのリクエストをリアルタイムにHadoopに投入する方法を検討しています。

begirama.hatenablog.com MuleHDFSコネクタを使おうと思ったけど、Mule CEではだめだったので、今回はだめ。

begirama.hatenablog.com WebHDFSはクライアントから個々のデータノードへアクセスさせるので、社外ネットワークに存在するモバイルアプリに対して、すべてのデータノードを公開することはできないのでNG。

HadoopにWebベースでアクセスする方式を検討して丸1日。 HttpFSにたどり着きました。

WebHDFSをインターフェイスを似せてくれているので、WebHDFSをやった後だと理解は楽だった。 結局使わなかったWebHDFSの検証も無駄ではなかったと思いたい・・・。

設定

$HADOOP_HOME/etc/hadoop/core-site.xml

次の項目を設定する必要があります。

  • hadoop.proxyuser.{USER_NAME}.hosts
  • hadoop.proxyuser.{USER_NAME}.groups

設定ファイルを晒します。

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
       <name>fs.defaultFS</name>
       <value>hdfs://localhost:9000</value>
    </property>
    <property>
       <name>hadoop.proxyuser.rsogo.hosts</name>
       <value>localhost</value>
    </property>
    <property>
       <name>hadoop.proxyuser.rsogo.groups</name>
       <value>*</value>
    </property>
    <property>
       <name>doop.job.ugi</name>
       <value>rsogo,staff</value>
    </property>
</configuration>

rsogoの所は起動ユーザーです。環境に合わせて書き換える必要があります。

HttpFSの起動

$ $HADOOP_HOME/sbin/httpfs.sh start
Setting HTTPFS_HOME:          /usr/local/Cellar/hadoop/2.7.1/libexec
Setting HTTPFS_CONFIG:        /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop
Sourcing:                    /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/httpfs-env.sh
Setting HTTPFS_LOG:           /usr/local/Cellar/hadoop/2.7.1/libexec/logs
Setting HTTPFS_TEMP:           /usr/local/Cellar/hadoop/2.7.1/libexec/temp
Setting HTTPFS_HTTP_PORT:     14000
Setting HTTPFS_ADMIN_PORT:     14001
Setting HTTPFS_HTTP_HOSTNAME: Ryohei-no-MacBook-Pro.local
Setting HTTPFS_SSL_ENABLED: false
Setting HTTPFS_SSL_KEYSTORE_FILE:     /Users/rsogo/.keystore
Setting HTTPFS_SSL_KEYSTORE_PASS:     password
Setting CATALINA_BASE:       /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/httpfs/tomcat
Setting HTTPFS_CATALINA_HOME:       /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/httpfs/tomcat
Setting CATALINA_OUT:        /usr/local/Cellar/hadoop/2.7.1/libexec/logs/httpfs-catalina.out
Setting CATALINA_PID:        /tmp/httpfs.pid

(略)

tomcatが起動します。14000ポートをデフォルトでリッスンするみたいです。

ファイルの作成

使うのはこのファイル。

$ cat sample.txt 
1   aaa
2   bbb
3   ccc

WebHDFS同様、ファイルの生成には2回のリクエストが必要です。

$ curl -i -X PUT "http://localhost:14000/webhdfs/v1/hoge?op=create&overwrite=true&user.name=rsogo"
HTTP/1.1 307 Temporary Redirect
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474419838951&s=xlo/gx4W9ABt+0kB7NCLb1xVHFk="; Path=/; Expires= , 21-9-2016 01:03:58 GMT; HttpOnly
Location: http://localhost:14000/webhdfs/v1/hoge?op=CREATE&data=true&user.name=rsogo&overwrite=true
Content-Type: application/json
Content-Length: 0
Date: Tue, 20 Sep 2016 15:03:58 GMT

WebHDFSと違うのは、リダイレクトされるホスト名、ポート番号が変わらないというところ。

ヘッダーに"content-type: application/octet-stream"を付ける必要がある。付けないと400 Bad Requestが返ってくるよ。

$ curl -i -X PUT "http://localhost:14000/webhdfs/v1/hoge?op=CREATE&data=true&user.name=rsogo&overwrite=true" -T sample.txt --header "content-type: application/octet-stream"
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420289696&s=DcQxT8lP7kcCBuBbxaaCKyR5LqM="; Path=/; Expires= , 21-9-2016 01:11:29 GMT; HttpOnly
Content-Type: application/json
Content-Length: 0
Date: Tue, 20 Sep 2016 15:11:31 GMT

ファイルの確認

$ curl -i "http://localhost:14000/webhdfs/v1/hoge?op=liststatus&user.name=rsogo"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420414909&s=ELbVZf2Fqnyg5v0ohAWraURvTC4="; Path=/; Expires= , 21-9-2016 01:13:34 GMT; HttpOnly
Content-Type: application/json
Transfer-Encoding: chunked
Date: Tue, 20 Sep 2016 15:13:34 GMT

{"FileStatuses":{"FileStatus":[{"pathSuffix":"hoge","type":"FILE","length":19,"owner":"rsogo","group":"supergroup","permission":"755","accessTime":1474384290147,"modificationTime":1474384291294,"blockSize":134217728,"replication":1}]}}

ファイルのオープンと読み込み

WebHDFSだと2回に分けて取ってくるけど、1回のリクエストで取ってこれる。

$ curl -i "http://localhost:14000/webhdfs/v1/hoge?op=open&user.name=rsogo"
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420480591&s=haNBlr1YEaySxzRQSH/bE2Ed8eg="; Path=/; Expires= , 21-9-2016 01:14:40 GMT; HttpOnly
Content-Type: application/octet-stream
Content-Length: 19
Date: Tue, 20 Sep 2016 15:14:40 GMT

1   aaa
2   bbb
3   ccc

HttpFSの停止

$ $HADOOP_HOME/sbin/httpfs.sh stop

WebHDFSの検証

ファイルの作成

ファイルのアクセスは2回に分けて行う必要があります。

作成するファイルの中身はこんな感じです。

$ cat sample.txt 
1   aaa
2   bbb
3   ccc

まず、1回目。

  • PUTメソッドを利用します
  • 操作名はop=create
  • 上書きするoverwrite=true
  • ユーザーはここではhadoopを起動させているユーザーを指定しました。user.name=rsogo
$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/hoge?op=create&overwrite=true&user.name=rsogo"
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Tue, 20 Sep 2016 09:59:29 GMT
Date: Tue, 20 Sep 2016 09:59:29 GMT
Pragma: no-cache
Expires: Tue, 20 Sep 2016 09:59:29 GMT
Date: Tue, 20 Sep 2016 09:59:29 GMT
Pragma: no-cache
Content-Type: application/octet-stream
Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474401569719&s=BASKS30rCxMmFD2EbnvPj3Km04A="; Path=/; Expires=?, 20-9-2016 19:59:29 GMT; HttpOnly
Location: http://192.168.10.2:50075/webhdfs/v1/hoge?op=CREATE&user.name=rsogo&namenoderpcaddress=localhost:9000&overwrite=true
Content-Length: 0
Server: Jetty(6.1.26)

ここでLocationにリダイレクト先のURIが返ってきます。 Hadoopはデータが分散して配置されるのでデータが配置されている場所が返ってきます。

2回目。 リダイレクトされたURL宛に-Tで送るファイルを指定して、リクエストを送ります。

$ curl -i -X PUT "http://192.168.10.2:50075/webhdfs/v1/hoge?op=CREATE&user.name=rsogo&namenoderpcaddress=localhost:9000&overwrite=true" -T sample.txt 
HTTP/1.1 100 Continue

HTTP/1.1 201 Created
Location: hdfs://localhost:9000/hoge
Content-Length: 0
Connection: close

201 Createdが返ってくればOK。

ファイルの状態確認

$ curl -i "http://localhost:50070/webhdfs/v1/hoge?op=LISTSTATUS"
HTTP/1.1 200 OK
Cache-Control: no-cache
Expires: Tue, 20 Sep 2016 10:08:06 GMT
Date: Tue, 20 Sep 2016 10:08:06 GMT
Pragma: no-cache
Expires: Tue, 20 Sep 2016 10:08:06 GMT
Date: Tue, 20 Sep 2016 10:08:06 GMT
Pragma: no-cache
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)

{"FileStatuses":{"FileStatus":[
{"accessTime":1474365590688,"blockSize":134217728,"childrenNum":0,"fileId":16409,"group":"supergroup","length":19,"modificationTime":1474365591427,"owner":"rsogo","pathSuffix":"","permission":"755","replication":1,"storagePolicy":0,"type":"FILE"}
]}}

ownerはファイル生成の時に指定したユーザーになってますね。 "owner":"rsogo"

ファイルの読み込み

これもCreate同様、2回に別けて行う必要があります。

1回目。Namenodeに向かって下記のコマンドを投げると、リダイレクトを指示されます。

  • GETメソッドを利用します
  • 操作名はop=OPEN
$ curl -i "http://localhost:50070/webhdfs/v1/hoge?op=OPEN"
HTTP/1.1 307 TEMPORARY_REDIRECT
Cache-Control: no-cache
Expires: Tue, 20 Sep 2016 10:09:39 GMT
Date: Tue, 20 Sep 2016 10:09:39 GMT
Pragma: no-cache
Expires: Tue, 20 Sep 2016 10:09:39 GMT
Date: Tue, 20 Sep 2016 10:09:39 GMT
Pragma: no-cache
Content-Type: application/octet-stream
Location: http://192.168.10.2:50075/webhdfs/v1/hoge?op=OPEN&namenoderpcaddress=localhost:9000&offset=0
Content-Length: 0
Server: Jetty(6.1.26)

2回目、支持されたURIにアクセスすることで、Body部に先程作成したファイルの内容が返ってきます。

  • GETメソッドを利用します
$ curl -i "http://192.168.10.2:50075/webhdfs/v1/hoge?op=OPEN&namenoderpcaddress=localhost:9000&offset=0"
HTTP/1.1 200 OK
Access-Control-Allow-Methods: GET
Access-Control-Allow-Origin: *
Content-Type: application/octet-stream
Connection: close
Content-Length: 19

1   aaa
2   bbb
3   ccc

検証中、自宅のルーターが不安定でIPアドレスが途中で変わってしまうという事象が発生した。 この時、

参考にさせていただいたサイト

tagomoris.hatenablog.com

WebHDFS APIメモ(Hishidama's Hadoop WebHDFS REST API Memo)

公式マニュアル

WebHDFS REST API

Mule HDFSコネクタ5.0.0の検証(Mule EEが必要でした。途中で止めています)

MuleにはHDFSコネクタというのがあって、HDFSに接続できるみたいです。試してみたいと思います。 この検証の結果としては、Mule EEがHDFSコネクタの検証に必要だと分かったため、途中で止めています。

HDFSコネクタの情報源

  • リリースノート

HDFS Connector Release Notes // MuleSoft Documentation

このコネクタは「Select」というカテゴリに分類されています。 コネクタのサポートポリシーはこちらに記載があります。

Anypoint Connectors // MuleSoft Documentation

Product Versioning and Back Support Policy

このAPIリファレンスにRequires Mule Enterprise Licenseと記載があるのですが、最初は見落としていました。

Anypoint Studioの準備

HDFSコネクタを使用するためにAnypoint Studioをインストールします。 Anypoint StudioはこちらのDownload Anypoint Studioからダウンロードして、インストーラを実行して下さい。 Mule Runtime Engine | MuleSoft Developers

Mule Runtimeのインストール

Anypoint Studioでは埋め込みのMule Runtimeを使うことができます。 インストールもAnypoint Studioから実行できます。

Anypoint Studioの「Help」、「Install New Software」からWork withで「Mule Runtimes for Anypoint Studio」を選択します。

「Anypoint Studio Community Runtimes」を選択してインストールします。 f:id:begirama:20160920122639p:plain f:id:begirama:20160920122701p:plain

HDFSコネクタのインストール

リリースノートからインストールマニュアルへのリンクが404になるんだけど、多分Githubで公開されてる、このサイトを見ればOKそう。

hdfs-connector/USAGE.md at master · mulesoft/hdfs-connector · GitHub

HDFSコネクタもAnypoint Studioからインストールできます。

Anypoint Studioの「Help」、「Install New Software」からWork withで「Anypoint Connectors Update Site」を選択します。

f:id:begirama:20160920123833p:plain f:id:begirama:20160920123859p:plain

HDFSコネクタのサンプルを試す

「File」、「Import」からダウンロードしたサンプルのmule-project.xmlを含むディレクトリを開きます。 f:id:begirama:20160920125712p:plainf:id:begirama:20160920125822p:plain

mule-app.propertiesのconfig.nameNodeUriにHadoopがリッスンしているホスト名とポート番号を設定します。

config.nameNodeUri=hdfs://localhost:9000
config.sysUser=

Anypont Studioで「Run」を実行します。

自分の場合は、dw:transform-message周りでエラーがでたので、List_Status_Flowと、Glob_Status_Flowを丸っと削除することで実行することができました。

エラーが出ずに実行できればhttp://localhost:8090/にアクセスします。

f:id:begirama:20160920151036p:plain

Webベースでいろいろな操作ができるようになっているので、適当に実行してみると・・・

f:id:begirama:20160920151259p:plain

Mule EEが必要だったみたいです。 今回はCEで検証する必要があったので、一旦ここまで。

Hiveでファイルからデータをロードする

前記事はこれ。 begirama.hatenablog.com

begirama.hatenablog.com

読み込むファイルを作成します

今回はタブ区切りの次のようなファイルを作成します。

1    aaa
2   bbb
3   ccc

読み込む先のテーブルを作成します

項目の区切りをタブ、行末を改行として、数値型のidと、文字列型のnameを持つテーブルを作成します。項目の区切りとかを、ロード時じゃなくて、テーブル作成時に指定できるのが、RDBMSとは違う感じ。

hive> create table hoge(id int, name string) row format 
> delimited fields terminated by '\t' 
> lines terminated by '\n';
create table hoge(id int, name string) row format delimited fields terminated by '\t' lines terminated by '\n'
OK
Time taken: 0.138 seconds

load dataでさっき作ったファイルを読み込みます

hive> load data local inpath '/Users/rsogo/work/hive/sample.txt' into table hoge;
load data local inpath '/Users/rsogo/work/hive/sample.txt' into table hoge
Loading data to table testdb.hoge
Table testdb.hoge stats: [numFiles=1, totalSize=19]
OK
Time taken: 1.659 seconds

ロードしたデータをselectしてみます。項目名が出るように設定して・・・

hive> set hive.cli.print.header=true;
set hive.cli.print.header=true

selectします。

hive> select * from hoge;
select * from hoge
OK
hoge.id hoge.name
1   aaa
2   bbb
3   ccc
NULL    NULL
Time taken: 0.118 seconds, Fetched: 4 row(s)

あれ、最後の空行が1レコードとして登録されてる。