MacOSXへのApache Kafkaのセットアップ
0.10.0.1
を入れます。
1. Zookeeperの起動
$ bin/zookeeper-server-start.sh config/zookeeper.properties Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 [2016-10-09 22:17:15,900] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-10-09 22:17:15,903] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-10-09 22:17:15,905] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2016-10-09 22:17:15,906] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2016-10-09 22:17:15,906] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2016-10-09 22:17:15,930] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-10-09 22:17:15,930] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) (中略) [2016-10-09 22:17:15,991] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2016-10-09 22:17:15,991] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2016-10-09 22:17:15,991] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2016-10-09 22:17:16,015] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
2. Kafkaの起動
$ bin/kafka-server-start.sh config/server.properties Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 [2016-10-09 22:18:41,676] INFO KafkaConfig values: advertised.host.name = null metric.reporters = [] quota.producer.default = 9223372036854775807 offsets.topic.num.partitions = 50 log.flush.interval.messages = 9223372036854775807 auto.create.topics.enable = true controller.socket.timeout.ms = 30000 log.flush.interval.ms = null principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder replica.socket.receive.buffer.bytes = 65536 min.insync.replicas = 1 replica.fetch.wait.max.ms = 500 num.recovery.threads.per.data.dir = 1 ssl.keystore.type = JKS sasl.mechanism.inter.broker.protocol = GSSAPI default.replication.factor = 1 ssl.truststore.password = null log.preallocate = false sasl.kerberos.principal.to.local.rules = [DEFAULT] fetch.purgatory.purge.interval.requests = 1000 ssl.endpoint.identification.algorithm = null replica.socket.timeout.ms = 30000 message.max.bytes = 1000012 num.io.threads = 8 offsets.commit.required.acks = -1 log.flush.offset.checkpoint.interval.ms = 60000 delete.topic.enable = false quota.window.size.seconds = 1 ssl.truststore.type = JKS offsets.commit.timeout.ms = 5000 quota.window.num = 11 zookeeper.connect = localhost:2181 authorizer.class.name = num.replica.fetchers = 1 log.retention.ms = null log.roll.jitter.hours = 0 log.cleaner.enable = true offsets.load.buffer.size = 5242880 log.cleaner.delete.retention.ms = 86400000 ssl.client.auth = none controlled.shutdown.max.retries = 3 queued.max.requests = 500 offsets.topic.replication.factor = 3 log.cleaner.threads = 1 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 socket.request.max.bytes = 104857600 ssl.trustmanager.algorithm = PKIX zookeeper.session.timeout.ms = 6000 log.retention.bytes = -1 log.message.timestamp.type = CreateTime sasl.kerberos.min.time.before.relogin = 60000 zookeeper.set.acl = false connections.max.idle.ms = 600000 offsets.retention.minutes = 1440 replica.fetch.backoff.ms = 1000 inter.broker.protocol.version = 0.10.0-IV1 log.retention.hours = 168 num.partitions = 1 broker.id.generation.enable = true listeners = null ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] log.roll.ms = null log.flush.scheduler.interval.ms = 9223372036854775807 ssl.cipher.suites = null log.index.size.max.bytes = 10485760 ssl.keymanager.algorithm = SunX509 security.inter.broker.protocol = PLAINTEXT replica.fetch.max.bytes = 1048576 advertised.port = null log.cleaner.dedupe.buffer.size = 134217728 replica.high.watermark.checkpoint.interval.ms = 5000 log.cleaner.io.buffer.size = 524288 sasl.kerberos.ticket.renew.window.factor = 0.8 zookeeper.connection.timeout.ms = 6000 controlled.shutdown.retry.backoff.ms = 5000 log.roll.hours = 168 log.cleanup.policy = delete host.name = log.roll.jitter.ms = null max.connections.per.ip = 2147483647 offsets.topic.segment.bytes = 104857600 background.threads = 10 quota.consumer.default = 9223372036854775807 request.timeout.ms = 30000 log.message.format.version = 0.10.0-IV1 log.index.interval.bytes = 4096 log.dir = /tmp/kafka-logs log.segment.bytes = 1073741824 log.cleaner.backoff.ms = 15000 offset.metadata.max.bytes = 4096 ssl.truststore.location = null group.max.session.timeout.ms = 300000 ssl.keystore.password = null zookeeper.sync.time.ms = 2000 port = 9092 log.retention.minutes = null log.segment.delete.delay.ms = 60000 log.dirs = /tmp/kafka-logs controlled.shutdown.enable = true compression.type = producer max.connections.per.ip.overrides = log.message.timestamp.difference.max.ms = 9223372036854775807 sasl.kerberos.kinit.cmd = /usr/bin/kinit log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 auto.leader.rebalance.enable = true leader.imbalance.check.interval.seconds = 300 log.cleaner.min.cleanable.ratio = 0.5 replica.lag.time.max.ms = 10000 num.network.threads = 3 ssl.key.password = null reserved.broker.max.id = 1000 metrics.num.samples = 2 socket.send.buffer.bytes = 102400 ssl.protocol = TLS socket.receive.buffer.bytes = 102400 ssl.keystore.location = null replica.fetch.min.bytes = 1 broker.rack = null unclean.leader.election.enable = true sasl.enabled.mechanisms = [GSSAPI] group.min.session.timeout.ms = 6000 log.cleaner.io.buffer.load.factor = 0.9 offsets.retention.check.interval.ms = 600000 producer.purgatory.purge.interval.requests = 1000 metrics.sample.window.ms = 30000 broker.id = 0 offsets.topic.compression.codec = 0 log.retention.check.interval.ms = 300000 advertised.listeners = null leader.imbalance.per.broker.percentage = 10 (kafka.server.KafkaConfig) [2016-10-09 22:18:41,741] INFO starting (kafka.server.KafkaServer) [2016-10-09 22:18:41,748] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2016-10-09 22:18:41,781] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) (略) [2016-10-09 22:18:41,804] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@3e2e18f2 (org.apache.zookeeper.ZooKeeper) [2016-10-09 22:18:41,821] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2016-10-09 22:18:41,825] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2016-10-09 22:18:41,897] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2016-10-09 22:18:42,091] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x157a9971f8c0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2016-10-09 22:18:42,093] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2016-10-09 22:18:42,198] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) [2016-10-09 22:18:42,224] INFO Loading logs. (kafka.log.LogManager) [2016-10-09 22:18:42,233] INFO Logs loading complete. (kafka.log.LogManager) [2016-10-09 22:18:42,436] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2016-10-09 22:18:42,441] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2016-10-09 22:18:42,457] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2016-10-09 22:18:42,509] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2016-10-09 22:18:42,512] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2016-10-09 22:18:42,552] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-10-09 22:18:42,553] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-10-09 22:18:42,638] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-10-09 22:18:42,648] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-10-09 22:18:42,649] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2016-10-09 22:18:42,737] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-10-09 22:18:42,739] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2016-10-09 22:18:42,793] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager) [2016-10-09 22:18:42,801] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator) [2016-10-09 22:18:42,802] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator) [2016-10-09 22:18:42,880] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2016-10-09 22:18:42,889] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-10-09 22:18:42,890] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2016-10-09 22:18:42,901] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2016-10-09 22:18:42,945] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-10-09 22:18:42,949] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2016-10-09 22:18:42,955] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.11.5,9092,PLAINTEXT) (kafka.utils.ZkUtils) [2016-10-09 22:18:42,956] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2016-10-09 22:18:43,046] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser) [2016-10-09 22:18:43,046] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser) [2016-10-09 22:18:43,047] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2016-10-09 22:19:43,240] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [page_visit,0] (kafka.server.ReplicaFetcherManager) [2016-10-09 22:19:43,295] INFO Completed load of log page_visit-0 with log end offset 0 (kafka.log.Log) [2016-10-09 22:19:43,298] INFO Created log for partition [page_visit,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager) [2016-10-09 22:19:43,299] INFO Partition [page_visit,0] on broker 0: No checkpointed highwatermark is found for partition [page_visit,0] (kafka.cluster.Partition) [2016-10-09 22:28:42,798] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
3. Topicの作成
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visit Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic "page_visit".
MacOSXへのActiveMQのセットアップ
MQTTを試したいので、ActiveMQをローカルに入れたいと思います。
1. モジュールの入手
http://activemq.apache.org/ 今は5.14.1 が最新みたいです。
Windows Distributionと、Unix/Linux/Cygwin Distributionがあるので、Unix/Linux/Cygwin Distributionの方をダウンロードして、解凍します。
2. 起動
解凍した場所をACTIVEMQ_HOMEとします。$ACTIVEMQ_HOME/bin配下のactivemq start
で起動できます。
$ bin/activemq start INFO: Loading '/Users/rsogo/work/apppot-iot/apache-activemq-5.14.1//bin/env' INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/Users/rsogo/work/apppot-iot/apache-activemq-5.14.1//data/activemq.pid' (pid '48834')
ログは$ACTIVEMQ_HOME/data/activemq.log
にでているっぽい。
$ tail -f data/activemq.log 2016-10-09 21:35:59,708 | INFO | Apache ActiveMQ 5.14.1 (localhost, ID:******) started | org.apache.activemq.broker.BrokerService | main 2016-10-09 21:35:59,710 | INFO | For help or more information please see: http://activemq.apache.org | org.apache.activemq.broker.BrokerService | main 2016-10-09 21:35:59,723 | WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: /Users/rsogo/work/apppot-iot/apache-activemq-5.14.1/data/kahadb only has 24747 mb of usable space. - resetting to maximum available disk space: 24747 mb | org.apache.activemq.broker.BrokerService | main 2016-10-09 21:35:59,726 | WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: /Users/rsogo/work/apppot-iot/apache-activemq-5.14.1/data only has 24747 mb of usable space. - resetting to maximum available disk space: 24747 mb | org.apache.activemq.broker.BrokerService | main 2016-10-09 21:36:00,382 | INFO | No Spring WebApplicationInitializer types detected on classpath | /admin | main 2016-10-09 21:36:00,546 | INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/ | org.apache.activemq.web.WebConsoleStarter | main 2016-10-09 21:36:00,546 | INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/ | org.apache.activemq.web.WebConsoleStarter | main 2016-10-09 21:36:00,615 | INFO | Initializing Spring FrameworkServlet 'dispatcher' | /admin | main 2016-10-09 21:36:00,891 | INFO | No Spring WebApplicationInitializer types detected on classpath | /api | main 2016-10-09 21:36:00,973 | INFO | jolokia-agent: Using policy access restrictor classpath:/jolokia-access.xml | /api | main
3. 管理画面の確認
ログにでている通り、8161ポートでWebConsoleが動いています。
4. キューの作成
5. テストメッセージの送信
Destinationに作ったキューを指定して、Send。
Queuesメニューから先程作ったキューを選択すると、メッセージが入っていることを確認できます。
AppPot経由でHttpFSでREST化したHadoopとやり取りする
GetAnonymouseToken
- GETメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/anonymousTokens?appKey=40c71254aca44664b61635573085ef1d&deviceUDID=bd393116-0b47-4b9e-a186-e64ffc0fbdf2
- Response
{ "status": "OK", "errCode": 0, "description": null, "results": "e3b82dd1cd964b76a2ee62c0ec97344f" }
Login
- POSTメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/auth/login
- Request
{ "username": "yamada", "password": "12345678@X", "appId": "apppot.TestApplication", "appVersion": "1.0.0", "deviceUDID": "bd393116-0b47-4b9e-a186-e64ffc0fbdf2", "isPush": "false", "companyId": 1 }
- Response
{ "status": "OK", "errCode": 0, "description": null, "apppotInfo": "AppPot Server 2.3.6 ", "authInfor": { "userTokens": "e0f08ddd14d3496d978afdb3530fe64f", "validTime": 1474388126575, "userId": 2, "userInfo": { "account": "yamada", "firstName": "太郎", "lastName": "山田" }, "groupsAndRoles": [ { "groupId": 1, "groupName": "開発グループ", "description": "", "roleId": 2, "roleName": "Super Admin" } ] } }
ファイルの状態確認
- GETメソッド
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge?user.name=rsogo&op=liststatus
HttpFSの検証で作ったファイルをAppPot経由で参照することができました。
- Response
{ "status": "OK", "errCode": 0, "description": "", "results": { "hoge": { "FileStatuses": { "FileStatus": [ { "pathSuffix": "hoge", "type": "FILE", "length": 19, "owner": "rsogo", "group": "supergroup", "permission": "755", "accessTime": 1474384290147, "modificationTime": 1474384291294, "blockSize": 134217728, "replication": 1 } ] } } } }
ファイルの作成
- apppot-token: Loginの結果得られたトークン
- Content-Type: application/octet-stream
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge1?op=CREATE&data=true&user.name=rsogo&overwrite=true
POSTMANを使っていますが、ファイルの添付ができるので、Hadoopに投入したいファイルを選択します。
ファイルの読み込み
http://localhost:8080/apppot/api/1/apppot.TestApplication/1.0.0/gateway/hdfs/hoge1?op=open&user.name=rsogo
{ "status": "OK", "errCode": 0, "description": "", "results": { "hoge1": "aaa,bbb" } }
OK
HttpFSの検証
モバイルアプリからのリクエストをリアルタイムにHadoopに投入する方法を検討しています。
begirama.hatenablog.com MuleのHDFSコネクタを使おうと思ったけど、Mule CEではだめだったので、今回はだめ。
begirama.hatenablog.com WebHDFSはクライアントから個々のデータノードへアクセスさせるので、社外ネットワークに存在するモバイルアプリに対して、すべてのデータノードを公開することはできないのでNG。
HadoopにWebベースでアクセスする方式を検討して丸1日。 HttpFSにたどり着きました。
WebHDFSをインターフェイスを似せてくれているので、WebHDFSをやった後だと理解は楽だった。 結局使わなかったWebHDFSの検証も無駄ではなかったと思いたい・・・。
設定
$HADOOP_HOME/etc/hadoop/core-site.xml
次の項目を設定する必要があります。
設定ファイルを晒します。
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>hadoop.proxyuser.rsogo.hosts</name> <value>localhost</value> </property> <property> <name>hadoop.proxyuser.rsogo.groups</name> <value>*</value> </property> <property> <name>doop.job.ugi</name> <value>rsogo,staff</value> </property> </configuration>
rsogoの所は起動ユーザーです。環境に合わせて書き換える必要があります。
HttpFSの起動
$ $HADOOP_HOME/sbin/httpfs.sh start Setting HTTPFS_HOME: /usr/local/Cellar/hadoop/2.7.1/libexec Setting HTTPFS_CONFIG: /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop Sourcing: /usr/local/Cellar/hadoop/2.7.1/libexec/etc/hadoop/httpfs-env.sh Setting HTTPFS_LOG: /usr/local/Cellar/hadoop/2.7.1/libexec/logs Setting HTTPFS_TEMP: /usr/local/Cellar/hadoop/2.7.1/libexec/temp Setting HTTPFS_HTTP_PORT: 14000 Setting HTTPFS_ADMIN_PORT: 14001 Setting HTTPFS_HTTP_HOSTNAME: Ryohei-no-MacBook-Pro.local Setting HTTPFS_SSL_ENABLED: false Setting HTTPFS_SSL_KEYSTORE_FILE: /Users/rsogo/.keystore Setting HTTPFS_SSL_KEYSTORE_PASS: password Setting CATALINA_BASE: /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/httpfs/tomcat Setting HTTPFS_CATALINA_HOME: /usr/local/Cellar/hadoop/2.7.1/libexec/share/hadoop/httpfs/tomcat Setting CATALINA_OUT: /usr/local/Cellar/hadoop/2.7.1/libexec/logs/httpfs-catalina.out Setting CATALINA_PID: /tmp/httpfs.pid (略)
tomcatが起動します。14000ポートをデフォルトでリッスンするみたいです。
ファイルの作成
使うのはこのファイル。
$ cat sample.txt 1 aaa 2 bbb 3 ccc
WebHDFS同様、ファイルの生成には2回のリクエストが必要です。
$ curl -i -X PUT "http://localhost:14000/webhdfs/v1/hoge?op=create&overwrite=true&user.name=rsogo" HTTP/1.1 307 Temporary Redirect Server: Apache-Coyote/1.1 Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474419838951&s=xlo/gx4W9ABt+0kB7NCLb1xVHFk="; Path=/; Expires= , 21-9-2016 01:03:58 GMT; HttpOnly Location: http://localhost:14000/webhdfs/v1/hoge?op=CREATE&data=true&user.name=rsogo&overwrite=true Content-Type: application/json Content-Length: 0 Date: Tue, 20 Sep 2016 15:03:58 GMT
WebHDFSと違うのは、リダイレクトされるホスト名、ポート番号が変わらないというところ。
ヘッダーに"content-type: application/octet-stream"
を付ける必要がある。付けないと400 Bad Requestが返ってくるよ。
$ curl -i -X PUT "http://localhost:14000/webhdfs/v1/hoge?op=CREATE&data=true&user.name=rsogo&overwrite=true" -T sample.txt --header "content-type: application/octet-stream" HTTP/1.1 100 Continue HTTP/1.1 201 Created Server: Apache-Coyote/1.1 Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420289696&s=DcQxT8lP7kcCBuBbxaaCKyR5LqM="; Path=/; Expires= , 21-9-2016 01:11:29 GMT; HttpOnly Content-Type: application/json Content-Length: 0 Date: Tue, 20 Sep 2016 15:11:31 GMT
ファイルの確認
$ curl -i "http://localhost:14000/webhdfs/v1/hoge?op=liststatus&user.name=rsogo" HTTP/1.1 200 OK Server: Apache-Coyote/1.1 Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420414909&s=ELbVZf2Fqnyg5v0ohAWraURvTC4="; Path=/; Expires= , 21-9-2016 01:13:34 GMT; HttpOnly Content-Type: application/json Transfer-Encoding: chunked Date: Tue, 20 Sep 2016 15:13:34 GMT {"FileStatuses":{"FileStatus":[{"pathSuffix":"hoge","type":"FILE","length":19,"owner":"rsogo","group":"supergroup","permission":"755","accessTime":1474384290147,"modificationTime":1474384291294,"blockSize":134217728,"replication":1}]}}
ファイルのオープンと読み込み
WebHDFSだと2回に分けて取ってくるけど、1回のリクエストで取ってこれる。
$ curl -i "http://localhost:14000/webhdfs/v1/hoge?op=open&user.name=rsogo" HTTP/1.1 200 OK Server: Apache-Coyote/1.1 Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474420480591&s=haNBlr1YEaySxzRQSH/bE2Ed8eg="; Path=/; Expires= , 21-9-2016 01:14:40 GMT; HttpOnly Content-Type: application/octet-stream Content-Length: 19 Date: Tue, 20 Sep 2016 15:14:40 GMT 1 aaa 2 bbb 3 ccc
HttpFSの停止
$ $HADOOP_HOME/sbin/httpfs.sh stop
WebHDFSの検証
ファイルの作成
ファイルのアクセスは2回に分けて行う必要があります。
作成するファイルの中身はこんな感じです。
$ cat sample.txt 1 aaa 2 bbb 3 ccc
まず、1回目。
- PUTメソッドを利用します
- 操作名は
op=create
。 - 上書きする
overwrite=true
。 - ユーザーはここではhadoopを起動させているユーザーを指定しました。
user.name=rsogo
$ curl -i -X PUT "http://localhost:50070/webhdfs/v1/hoge?op=create&overwrite=true&user.name=rsogo" HTTP/1.1 307 TEMPORARY_REDIRECT Cache-Control: no-cache Expires: Tue, 20 Sep 2016 09:59:29 GMT Date: Tue, 20 Sep 2016 09:59:29 GMT Pragma: no-cache Expires: Tue, 20 Sep 2016 09:59:29 GMT Date: Tue, 20 Sep 2016 09:59:29 GMT Pragma: no-cache Content-Type: application/octet-stream Set-Cookie: hadoop.auth="u=rsogo&p=rsogo&t=simple&e=1474401569719&s=BASKS30rCxMmFD2EbnvPj3Km04A="; Path=/; Expires=?, 20-9-2016 19:59:29 GMT; HttpOnly Location: http://192.168.10.2:50075/webhdfs/v1/hoge?op=CREATE&user.name=rsogo&namenoderpcaddress=localhost:9000&overwrite=true Content-Length: 0 Server: Jetty(6.1.26)
ここでLocationにリダイレクト先のURIが返ってきます。 Hadoopはデータが分散して配置されるのでデータが配置されている場所が返ってきます。
2回目。
リダイレクトされたURL宛に-T
で送るファイルを指定して、リクエストを送ります。
$ curl -i -X PUT "http://192.168.10.2:50075/webhdfs/v1/hoge?op=CREATE&user.name=rsogo&namenoderpcaddress=localhost:9000&overwrite=true" -T sample.txt HTTP/1.1 100 Continue HTTP/1.1 201 Created Location: hdfs://localhost:9000/hoge Content-Length: 0 Connection: close
201 Createdが返ってくればOK。
ファイルの状態確認
$ curl -i "http://localhost:50070/webhdfs/v1/hoge?op=LISTSTATUS" HTTP/1.1 200 OK Cache-Control: no-cache Expires: Tue, 20 Sep 2016 10:08:06 GMT Date: Tue, 20 Sep 2016 10:08:06 GMT Pragma: no-cache Expires: Tue, 20 Sep 2016 10:08:06 GMT Date: Tue, 20 Sep 2016 10:08:06 GMT Pragma: no-cache Content-Type: application/json Transfer-Encoding: chunked Server: Jetty(6.1.26) {"FileStatuses":{"FileStatus":[ {"accessTime":1474365590688,"blockSize":134217728,"childrenNum":0,"fileId":16409,"group":"supergroup","length":19,"modificationTime":1474365591427,"owner":"rsogo","pathSuffix":"","permission":"755","replication":1,"storagePolicy":0,"type":"FILE"} ]}}
ownerはファイル生成の時に指定したユーザーになってますね。 "owner":"rsogo"
ファイルの読み込み
これもCreate同様、2回に別けて行う必要があります。
1回目。Namenodeに向かって下記のコマンドを投げると、リダイレクトを指示されます。
- GETメソッドを利用します
- 操作名は
op=OPEN
。
$ curl -i "http://localhost:50070/webhdfs/v1/hoge?op=OPEN" HTTP/1.1 307 TEMPORARY_REDIRECT Cache-Control: no-cache Expires: Tue, 20 Sep 2016 10:09:39 GMT Date: Tue, 20 Sep 2016 10:09:39 GMT Pragma: no-cache Expires: Tue, 20 Sep 2016 10:09:39 GMT Date: Tue, 20 Sep 2016 10:09:39 GMT Pragma: no-cache Content-Type: application/octet-stream Location: http://192.168.10.2:50075/webhdfs/v1/hoge?op=OPEN&namenoderpcaddress=localhost:9000&offset=0 Content-Length: 0 Server: Jetty(6.1.26)
2回目、支持されたURIにアクセスすることで、Body部に先程作成したファイルの内容が返ってきます。
- GETメソッドを利用します
$ curl -i "http://192.168.10.2:50075/webhdfs/v1/hoge?op=OPEN&namenoderpcaddress=localhost:9000&offset=0" HTTP/1.1 200 OK Access-Control-Allow-Methods: GET Access-Control-Allow-Origin: * Content-Type: application/octet-stream Connection: close Content-Length: 19 1 aaa 2 bbb 3 ccc
検証中、自宅のルーターが不安定でIPアドレスが途中で変わってしまうという事象が発生した。 この時、
参考にさせていただいたサイト
WebHDFS APIメモ(Hishidama's Hadoop WebHDFS REST API Memo)
公式マニュアル
Mule HDFSコネクタ5.0.0の検証(Mule EEが必要でした。途中で止めています)
MuleにはHDFSコネクタというのがあって、HDFSに接続できるみたいです。試してみたいと思います。 この検証の結果としては、Mule EEがHDFSコネクタの検証に必要だと分かったため、途中で止めています。
HDFSコネクタの情報源
- リリースノート
HDFS Connector Release Notes // MuleSoft Documentation
このコネクタは「Select」というカテゴリに分類されています。 コネクタのサポートポリシーはこちらに記載があります。
Anypoint Connectors // MuleSoft Documentation
Product Versioning and Back Support Policy
コネクタの入手 GitHub - mulesoft/hdfs-connector: Connects Mule to Hadoop Distributed File System.
サンプル Mule HDFS Connector
このAPIリファレンスにRequires Mule Enterprise License
と記載があるのですが、最初は見落としていました。
- マニュアル HDFS Connector
Anypoint Studioの準備
HDFSコネクタを使用するためにAnypoint Studioをインストールします。 Anypoint StudioはこちらのDownload Anypoint Studioからダウンロードして、インストーラを実行して下さい。 Mule Runtime Engine | MuleSoft Developers
Mule Runtimeのインストール
Anypoint Studioでは埋め込みのMule Runtimeを使うことができます。 インストールもAnypoint Studioから実行できます。
Anypoint Studioの「Help」、「Install New Software」からWork withで「Mule Runtimes for Anypoint Studio」を選択します。
「Anypoint Studio Community Runtimes」を選択してインストールします。
HDFSコネクタのインストール
リリースノートからインストールマニュアルへのリンクが404になるんだけど、多分Githubで公開されてる、このサイトを見ればOKそう。
hdfs-connector/USAGE.md at master · mulesoft/hdfs-connector · GitHub
HDFSコネクタもAnypoint Studioからインストールできます。
Anypoint Studioの「Help」、「Install New Software」からWork withで「Anypoint Connectors Update Site」を選択します。
HDFSコネクタのサンプルを試す
「File」、「Import」からダウンロードしたサンプルのmule-project.xmlを含むディレクトリを開きます。
mule-app.propertiesのconfig.nameNodeUriにHadoopがリッスンしているホスト名とポート番号を設定します。
config.nameNodeUri=hdfs://localhost:9000 config.sysUser=
Anypont Studioで「Run」を実行します。
自分の場合は、dw:transform-message周りでエラーがでたので、List_Status_Flowと、Glob_Status_Flowを丸っと削除することで実行することができました。
エラーが出ずに実行できればhttp://localhost:8090/にアクセスします。
Webベースでいろいろな操作ができるようになっているので、適当に実行してみると・・・
Mule EEが必要だったみたいです。 今回はCEで検証する必要があったので、一旦ここまで。
Hiveでファイルからデータをロードする
前記事はこれ。 begirama.hatenablog.com
読み込むファイルを作成します
今回はタブ区切りの次のようなファイルを作成します。
1 aaa 2 bbb 3 ccc
読み込む先のテーブルを作成します
項目の区切りをタブ、行末を改行として、数値型のidと、文字列型のnameを持つテーブルを作成します。項目の区切りとかを、ロード時じゃなくて、テーブル作成時に指定できるのが、RDBMSとは違う感じ。
hive> create table hoge(id int, name string) row format > delimited fields terminated by '\t' > lines terminated by '\n'; create table hoge(id int, name string) row format delimited fields terminated by '\t' lines terminated by '\n' OK Time taken: 0.138 seconds
load dataでさっき作ったファイルを読み込みます
hive> load data local inpath '/Users/rsogo/work/hive/sample.txt' into table hoge; load data local inpath '/Users/rsogo/work/hive/sample.txt' into table hoge Loading data to table testdb.hoge Table testdb.hoge stats: [numFiles=1, totalSize=19] OK Time taken: 1.659 seconds
ロードしたデータをselectしてみます。項目名が出るように設定して・・・
hive> set hive.cli.print.header=true; set hive.cli.print.header=true
selectします。
hive> select * from hoge; select * from hoge OK hoge.id hoge.name 1 aaa 2 bbb 3 ccc NULL NULL Time taken: 0.118 seconds, Fetched: 4 row(s)
あれ、最後の空行が1レコードとして登録されてる。