/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package kafka.server import java.io.{File, IOException} import java.net.SocketTimeoutException import java.util import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_9_0 import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.{LogConfig, LogManager} import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter} import kafka.network.{BlockingChannel, SocketServer} import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} import org.apache.kafka.common.security.{JaasContext, JaasUtils} import org.apache.kafka.common.utils.{AppInfoParser, Time} import org.apache.kafka.common.{ClusterResource, Node} import scala.collection.JavaConverters._ import scala.collection.{Seq, Map, mutable} object KafkaServer { // Copy the subset of properties that are relevant to Logs // I'm listing out individual properties here since the names are slightly different in each Config class... private[kafka] def copyKafkaConfigToLog(kafkaConfig: KafkaConfig): java.util.Map[String, Object] = { val logProps = new util.HashMap[String, Object]() logProps.put(LogConfig.SegmentBytesProp, kafkaConfig.logSegmentBytes) logProps.put(LogConfig.SegmentMsProp, kafkaConfig.logRollTimeMillis) logProps.put(LogConfig.SegmentJitterMsProp, kafkaConfig.logRollTimeJitterMillis) logProps.put(LogConfig.SegmentIndexBytesProp, kafkaConfig.logIndexSizeMaxBytes) logProps.put(LogConfig.FlushMessagesProp, kafkaConfig.logFlushIntervalMessages) logProps.put(LogConfig.FlushMsProp, kafkaConfig.logFlushIntervalMs) logProps.put(LogConfig.RetentionBytesProp, kafkaConfig.logRetentionBytes) logProps.put(LogConfig.RetentionMsProp, kafkaConfig.logRetentionTimeMillis: java.lang.Long) logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes) logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes) logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs) logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs) logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs) logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio) logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy) logProps.put(LogConfig.MinInSyncReplicasProp, kafkaConfig.minInSyncReplicas) logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType) logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable) logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable) logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version) logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name) logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long) logProps } private[server] def metricConfig(kafkaConfig: KafkaConfig): MetricConfig = { new MetricConfig() .samples(kafkaConfig.metricNumSamples) .recordLevel(Sensor.RecordingLevel.forName(kafkaConfig.metricRecordingLevel)) .timeWindow(kafkaConfig.metricSampleWindowMs, TimeUnit.MILLISECONDS) } } /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required * to start up and shutdown a single Kafka node. */ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List()) extends Logging with KafkaMetricsGroup { private val startupComplete = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false) private val isStartingUp = new AtomicBoolean(false) private var shutdownLatch = new CountDownLatch(1) private val jmxPrefix: String = "kafka.server" var metrics: Metrics = null val brokerState: BrokerState = new BrokerState var apis: KafkaApis = null var authorizer: Option[Authorizer] = None var socketServer: SocketServer = null var requestHandlerPool: KafkaRequestHandlerPool = null var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null var replicaManager: ReplicaManager = null var adminManager: AdminManager = null var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null var credentialProvider: CredentialProvider = null var groupCoordinator: GroupCoordinator = null var transactionCoordinator: TransactionCoordinator = null var kafkaController: KafkaController = null val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var kafkaHealthcheck: KafkaHealthcheck = null var metadataCache: MetadataCache = null var quotaManagers: QuotaFactory.QuotaManagers = null var zkUtils: ZkUtils = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap private var _clusterId: String = null private var _brokerTopicStats: BrokerTopicStats = null def clusterId: String = _clusterId private[kafka] def brokerTopicStats = _brokerTopicStats newGauge( "BrokerState", new Gauge[Int] { def value = brokerState.currentState } ) newGauge( "ClusterId", new Gauge[String] { def value = clusterId } ) newGauge( "yammer-metrics-count", new Gauge[Int] { def value = { com.yammer.metrics.Metrics.defaultRegistry().allMetrics().size() } } ) private def initZk(): ZkUtils = { info(s"Connecting to zookeeper on ${config.zkConnect}") val chrootIndex = config.zkConnect.indexOf("/") val chrootOption = { if (chrootIndex > 0) Some(config.zkConnect.substring(chrootIndex)) else None } val secureAclsEnabled = config.zkEnableSecureAcls val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled() if (secureAclsEnabled && !isZkSecurityEnabled) throw new java.lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the verification of the JAAS login file failed.") chrootOption.foreach { chroot => val zkConnForChrootCreation = config.zkConnect.substring(0, chrootIndex) val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation, sessionTimeout = config.zkSessionTimeoutMs, connectionTimeout = config.zkConnectionTimeoutMs, secureAclsEnabled) zkClientForChrootCreation.makeSurePersistentPathExists(chroot) info(s"Created zookeeper path $chroot") zkClientForChrootCreation.close() } val zkUtils = ZkUtils(config.zkConnect, sessionTimeout = config.zkSessionTimeoutMs, connectionTimeout = config.zkConnectionTimeoutMs, secureAclsEnabled) zkUtils.setupCommonPaths() zkUtils } def getOrGenerateClusterId(zkUtils: ZkUtils): String = { zkUtils.getClusterId.getOrElse(zkUtils.createOrGetClusterId(CoreUtils.generateUuidAsBase64)) } /** * Performs controlled shutdown */ private def controlledShutdown() { def node(broker: Broker): Node = { val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } val socketTimeoutMs = config.controllerSocketTimeoutMs def doControlledShutdown(retries: Int): Boolean = { val metadataUpdater = new ManualMetadataUpdater() val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( config.interBrokerSecurityProtocol, JaasContext.Type.SERVER, config, config.interBrokerListenerName, config.saslMechanismInterBrokerProtocol, config.saslInterBrokerHandshakeRequestEnable) val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, metrics, time, "kafka-server-controlled-shutdown", Map.empty.asJava, false, channelBuilder ) new NetworkClient( selector, metadataUpdater, config.brokerId.toString, 1, 0, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, config.requestTimeoutMs, time, false, new ApiVersions) } var shutdownSucceeded: Boolean = false try { var remainingRetries = retries var prevController: Broker = null var ioException = false while (!shutdownSucceeded && remainingRetries > 0) { remainingRetries = remainingRetries - 1 // 1. Find the controller and establish a connection to it. // Get the current controller info. This is to ensure we use the most recent info to issue the // controlled shutdown request val controllerId = zkUtils.getController() //If this method returns None ignore and try again zkUtils.getBrokerInfo(controllerId).foreach { broker => // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous // attempt, connect to the most recent controller if (ioException || broker != prevController) { ioException = false if (prevController != null) networkClient.close(node(prevController).idString) prevController = broker metadataUpdater.setNodes(Seq(node(prevController)).asJava) } } // 2. issue a controlled shutdown to the controller if (prevController != null) { try { if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs)) throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms") // send the controlled shutdown request val controlledShutdownApiVersion: Short = if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0 else 1 val controlledShutdownRequest = new ControlledShutdownRequest.Builder(config.brokerId, controlledShutdownApiVersion) val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest, time.milliseconds(), true) val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time) val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse] if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) { shutdownSucceeded = true info("Controlled shutdown succeeded") } else { info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(","))) info("Error code from controller: %d".format(shutdownResponse.error.code)) } } catch { case ioe: IOException => ioException = true warn("Error during controlled shutdown, possibly because leader movement took longer than the configured controller.socket.timeout.ms and/or request.timeout.ms: %s".format(ioe.getMessage)) // ignore and try again } } if (!shutdownSucceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) warn("Retrying controlled shutdown after the previous attempt failed...") } } } finally networkClient.close() shutdownSucceeded } if (startupComplete.get() && config.controlledShutdownEnable) { // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period // of time and try again for a configured number of retries. If all the attempt fails, we simply force // the shutdown. info("Starting controlled shutdown") brokerState.newState(PendingControlledShutdown) val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue) if (!shutdownSucceeded) warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed") } } /** * Shutdown API for shutting down a single instance of the Kafka server. * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread */ def shutdown() { try { info("shutting down") if (isStartingUp.get) throw new IllegalStateException("Kafka server is still starting up, cannot shut down!") // To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to // `true` at the end of this method. if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) CoreUtils.swallow(kafkaScheduler.shutdown()) if (apis != null) CoreUtils.swallow(apis.close()) CoreUtils.swallow(authorizer.foreach(_.close())) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown()) if (transactionCoordinator != null) CoreUtils.swallow(transactionCoordinator.shutdown()) if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown()) if (replicaManager != null) CoreUtils.swallow(replicaManager.shutdown()) if (logManager != null) CoreUtils.swallow(logManager.shutdown()) if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) if (metrics != null) CoreUtils.swallow(metrics.close()) if (brokerTopicStats != null) CoreUtils.swallow(brokerTopicStats.close()) brokerState.newState(NotRunning) startupComplete.set(false) isShuttingDown.set(false) CoreUtils.swallow(AppInfoParser.unregisterAppInfo(jmxPrefix, config.brokerId.toString)) shutdownLatch.countDown() info("shut down completed") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer shutdown.", e) isShuttingDown.set(false) throw e } } /** * After calling shutdown(), use this API to wait until the shutdown is complete */ def awaitShutdown(): Unit = shutdownLatch.await() def getLogManager(): LogManager = logManager def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) /** * Generates new brokerId if enabled or reads from meta.properties based on following conditions *