読者です 読者をやめる 読者になる 読者になる

そごうソフトウェア研究所

SOA、開発プロセス、ITアーキテクチャなどについて書いています。Twitterやってます@rsogo

MacOSXへのApache Kafkaのセットアップ

kafka.apache.org

0.10.0.1を入れます。

1. Zookeeperの起動

$ bin/zookeeper-server-start.sh config/zookeeper.properties 
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
[2016-10-09 22:17:15,900] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-10-09 22:17:15,903] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,905] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,906] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-10-09 22:17:15,906] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2016-10-09 22:17:15,930] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-10-09 22:17:15,930] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)

(中略)

[2016-10-09 22:17:15,991] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:15,991] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:15,991] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-09 22:17:16,015] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

2. Kafkaの起動

$ bin/kafka-server-start.sh config/server.properties 
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
[2016-10-09 22:18:41,676] INFO KafkaConfig values: 
    advertised.host.name = null
    metric.reporters = []
    quota.producer.default = 9223372036854775807
    offsets.topic.num.partitions = 50
    log.flush.interval.messages = 9223372036854775807
    auto.create.topics.enable = true
    controller.socket.timeout.ms = 30000
    log.flush.interval.ms = null
    principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
    replica.socket.receive.buffer.bytes = 65536
    min.insync.replicas = 1
    replica.fetch.wait.max.ms = 500
    num.recovery.threads.per.data.dir = 1
    ssl.keystore.type = JKS
    sasl.mechanism.inter.broker.protocol = GSSAPI
    default.replication.factor = 1
    ssl.truststore.password = null
    log.preallocate = false
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    fetch.purgatory.purge.interval.requests = 1000
    ssl.endpoint.identification.algorithm = null
    replica.socket.timeout.ms = 30000
    message.max.bytes = 1000012
    num.io.threads = 8
    offsets.commit.required.acks = -1
    log.flush.offset.checkpoint.interval.ms = 60000
    delete.topic.enable = false
    quota.window.size.seconds = 1
    ssl.truststore.type = JKS
    offsets.commit.timeout.ms = 5000
    quota.window.num = 11
    zookeeper.connect = localhost:2181
    authorizer.class.name = 
    num.replica.fetchers = 1
    log.retention.ms = null
    log.roll.jitter.hours = 0
    log.cleaner.enable = true
    offsets.load.buffer.size = 5242880
    log.cleaner.delete.retention.ms = 86400000
    ssl.client.auth = none
    controlled.shutdown.max.retries = 3
    queued.max.requests = 500
    offsets.topic.replication.factor = 3
    log.cleaner.threads = 1
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    socket.request.max.bytes = 104857600
    ssl.trustmanager.algorithm = PKIX
    zookeeper.session.timeout.ms = 6000
    log.retention.bytes = -1
    log.message.timestamp.type = CreateTime
    sasl.kerberos.min.time.before.relogin = 60000
    zookeeper.set.acl = false
    connections.max.idle.ms = 600000
    offsets.retention.minutes = 1440
    replica.fetch.backoff.ms = 1000
    inter.broker.protocol.version = 0.10.0-IV1
    log.retention.hours = 168
    num.partitions = 1
    broker.id.generation.enable = true
    listeners = null
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    log.roll.ms = null
    log.flush.scheduler.interval.ms = 9223372036854775807
    ssl.cipher.suites = null
    log.index.size.max.bytes = 10485760
    ssl.keymanager.algorithm = SunX509
    security.inter.broker.protocol = PLAINTEXT
    replica.fetch.max.bytes = 1048576
    advertised.port = null
    log.cleaner.dedupe.buffer.size = 134217728
    replica.high.watermark.checkpoint.interval.ms = 5000
    log.cleaner.io.buffer.size = 524288
    sasl.kerberos.ticket.renew.window.factor = 0.8
    zookeeper.connection.timeout.ms = 6000
    controlled.shutdown.retry.backoff.ms = 5000
    log.roll.hours = 168
    log.cleanup.policy = delete
    host.name = 
    log.roll.jitter.ms = null
    max.connections.per.ip = 2147483647
    offsets.topic.segment.bytes = 104857600
    background.threads = 10
    quota.consumer.default = 9223372036854775807
    request.timeout.ms = 30000
    log.message.format.version = 0.10.0-IV1
    log.index.interval.bytes = 4096
    log.dir = /tmp/kafka-logs
    log.segment.bytes = 1073741824
    log.cleaner.backoff.ms = 15000
    offset.metadata.max.bytes = 4096
    ssl.truststore.location = null
    group.max.session.timeout.ms = 300000
    ssl.keystore.password = null
    zookeeper.sync.time.ms = 2000
    port = 9092
    log.retention.minutes = null
    log.segment.delete.delay.ms = 60000
    log.dirs = /tmp/kafka-logs
    controlled.shutdown.enable = true
    compression.type = producer
    max.connections.per.ip.overrides = 
    log.message.timestamp.difference.max.ms = 9223372036854775807
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    auto.leader.rebalance.enable = true
    leader.imbalance.check.interval.seconds = 300
    log.cleaner.min.cleanable.ratio = 0.5
    replica.lag.time.max.ms = 10000
    num.network.threads = 3
    ssl.key.password = null
    reserved.broker.max.id = 1000
    metrics.num.samples = 2
    socket.send.buffer.bytes = 102400
    ssl.protocol = TLS
    socket.receive.buffer.bytes = 102400
    ssl.keystore.location = null
    replica.fetch.min.bytes = 1
    broker.rack = null
    unclean.leader.election.enable = true
    sasl.enabled.mechanisms = [GSSAPI]
    group.min.session.timeout.ms = 6000
    log.cleaner.io.buffer.load.factor = 0.9
    offsets.retention.check.interval.ms = 600000
    producer.purgatory.purge.interval.requests = 1000
    metrics.sample.window.ms = 30000
    broker.id = 0
    offsets.topic.compression.codec = 0
    log.retention.check.interval.ms = 300000
    advertised.listeners = null
    leader.imbalance.per.broker.percentage = 10
 (kafka.server.KafkaConfig)
[2016-10-09 22:18:41,741] INFO starting (kafka.server.KafkaServer)
[2016-10-09 22:18:41,748] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2016-10-09 22:18:41,781] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)

(略)

[2016-10-09 22:18:41,804] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@3e2e18f2 (org.apache.zookeeper.ZooKeeper)
[2016-10-09 22:18:41,821] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2016-10-09 22:18:41,825] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:41,897] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:42,091] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x157a9971f8c0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-10-09 22:18:42,093] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-10-09 22:18:42,198] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2016-10-09 22:18:42,224] INFO Loading logs. (kafka.log.LogManager)
[2016-10-09 22:18:42,233] INFO Logs loading complete. (kafka.log.LogManager)
[2016-10-09 22:18:42,436] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-10-09 22:18:42,441] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-10-09 22:18:42,457] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-10-09 22:18:42,509] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-10-09 22:18:42,512] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
[2016-10-09 22:18:42,552] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,553] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,638] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,648] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,649] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-10-09 22:18:42,737] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,739] INFO [ExpirationReaper-0], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-10-09 22:18:42,793] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-10-09 22:18:42,801] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-10-09 22:18:42,802] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-10-09 22:18:42,880] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-10-09 22:18:42,889] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-10-09 22:18:42,890] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-10-09 22:18:42,901] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-10-09 22:18:42,945] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,949] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-10-09 22:18:42,955] INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT -> EndPoint(192.168.11.5,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-10-09 22:18:42,956] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2016-10-09 22:18:43,046] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-10-09 22:18:43,046] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)
[2016-10-09 22:18:43,047] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2016-10-09 22:19:43,240] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [page_visit,0] (kafka.server.ReplicaFetcherManager)
[2016-10-09 22:19:43,295] INFO Completed load of log page_visit-0 with log end offset 0 (kafka.log.Log)
[2016-10-09 22:19:43,298] INFO Created log for partition [page_visit,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2016-10-09 22:19:43,299] INFO Partition [page_visit,0] on broker 0: No checkpointed highwatermark is found for partition [page_visit,0] (kafka.cluster.Partition)
[2016-10-09 22:28:42,798] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)

3. Topicの作成

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic page_visit
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic "page_visit".