跟我学Kafka之Controller控制器详解-Java-优质IT资源分享社区

admin
管理员
管理员
  • UID1
  • 粉丝27
  • 关注4
  • 发帖数581
  • 社区居民
  • 忠实会员
  • 原创写手
阅读:157回复:0

  跟我学Kafka之Controller控制器详解

楼主#
更多 发布于:2016-05-30 22:13

我们的kafka源码共享现已进行过许多期了,首要的内容也都共享的差不多了,那么在往后的共享中,首要会集在kafka功能优化和运用。

Kafka集群中的其中一个Broker会被推举为Controller,首要担任Partition办理和副本状况办理,也会履行类似于重分配Partition之类的办理使命。假如当时的Controller失利,会从其他正常的Broker中从头推举Controller。

进入KafkaController.scala文件看到如下代码:

class KafkaController(val config : KafkaConfig,

zkClient: ZkClient, val brokerState: BrokerState) extends Logging with

KafkaMetricsGroup {  this.logIdent = "[Controller " + config.brokerId + "]: "  

private var isRunning = true   private val stateChangeLogger =

KafkaController.stateChangeLogger  val controllerContext = new

ControllerContext(zkClient, config.zkSessionTimeoutMs)  val

partitionStateMachine = new PartitionStateMachine(this)  val replicaStateMachine

= new ReplicaStateMachine(this)  private val controllerElector = new

ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath,

onControllerFailover,    onControllerResignation, config.brokerId)  // have a

separate scheduler for the controller to be able to start and stop independently

of the   // kafka server   private val autoRebalanceScheduler = new

KafkaScheduler(1)  var deleteTopicManager: TopicDeletionManager = null   val

offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext,

config)  private val reassignedPartitionLeaderSelector = new

ReassignedPartitionLeaderSelector(controllerContext)  private val

preferredReplicaPartitionLeaderSelector = new

PreferredReplicaPartitionLeaderSelector(controllerContext)  private val

controlledShutdownPartitionLeaderSelector = new

ControlledShutdownLeaderSelector(controllerContext)  private val

brokerRequestBatch = new ControllerBrokerRequestBatch(this)  private val

partitionReassignedListener = new PartitionsReassignedListener(this)  private

val preferredReplicaElectionListener = new

PreferredReplicaElectionListener(this)

在KafkaController类中界说了许多特点,我们先要点了解下面的PartitionLeaderSelector目标,首要是为分区推举出leader

broker,该trait只界说了一个办法selectLeader,接纳一个TopicAndPartition目标和一个LeaderAndIsr目标。TopicAndPartition表明要选leader的分区,而第二个参数表明zookeeper中保留的该分区的当时leader和ISR记载。该办法会回来一个元组包含了推举出来的leader和ISR以及需求接纳LeaderAndISr恳求的一组副本。

trait PartitionLeaderSelector {      def

selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr:

LeaderAndIsr): (LeaderAndIsr, Seq[Int])}

经过我们上面的代码,能够看到在KafkaController中共界说了五种selector推举器:

1、NoOpLeaderSelector

2、OfflinePartitionLeaderSelector

3、ReassignedPartitionLeaderSelector

4、PreferredReplicaPartitionLeaderSelector

5、ControlledShutdownLeaderSelector

我们在解说这五个挑选器之前,先了解一下在Kafka中Partition的四种状况:

NonExistentPartition ——

这个状况表明该分区要么没有被创立过或曾经被创立过但后面被删去了。

NewPartition ——

分区创立以后就处于NewPartition状况。在这个状况中,分区应该现已分配了副本,但是还没有推举出leader和ISR。

OnlinePartition ——

一旦分区的leader被推选出来,它就处于OnlinePartition状况。

OfflinePartition —— 假如leader推举出来后,leader

broker宕机了,那么该分区就处于OfflinePartition状况。

四种状况的变换联系如下:

NonExistentPartition ->

NewPartition

首先将第一个可用的副本broker作为leader

broker并把一切可用的副本目标都装入ISR,然后写leader和ISR信息到zookeeper中保留

对于这个分区而言,发送LeaderAndIsr恳求到每个可用的副本broker,以及UpdateMetadata恳求到每个可用的broker上

OnlinePartition, OfflinePartition ->

OnlinePartition

为该分区挑选新的leader和ISR以及接纳LeaderAndIsr恳求的一组副本,然后写入leader和ISR信息到zookeeper中保留。

NewPartition, OnlinePartition ->

OfflinePartition

符号分区状况为离线(offline)。

OfflinePartition ->

NonExistentPartition

离线状况符号为不存在分区,表明该分区失利或许被删去。

在介绍完最基本的概念以后,下面我们将要点介绍上面提到过的五种推举器:

1、ReassignedPartitionLeaderSelector

从可用的ISR中挑选第一个作为leader,把当时的ISR作为新的ISR,将重分配的副本调集作为接纳LeaderAndIsr恳求的副本调集。

2、PreferredReplicaPartitionLeaderSelector

假如从assignedReplicas取出的第一个副本即是分区leader的话,则抛出反常,不然将第一个副本设置为分区leader。

3、ControlledShutdownLeaderSelector

将ISR中处于关闭状况的副本从调集中去除去,回来一个新新的ISR调集,然后挑选第一个副本作为leader,然后令当时AR作为接纳LeaderAndIsr恳求的副本。

4、NoOpLeaderSelector

原则上不做任何事情,回来当时的leader和isr。

5、OfflinePartitionLeaderSelector

从活着的ISR中挑选一个broker作为leader,假如ISR中没有活着的副本,则从assignedReplicas中挑选一个副本作为leader,leader推举成功后注册到Zookeeper中,并更新一切的缓存。

一切的leader挑选完成后,都要经过恳求把详细的request路由到对应的handler处理。目前kafka并没有把handler笼统出来,而是每个handler都是一个函数,混在KafkaApi类中。

本来也即是如下的代码:

def handle(request: RequestChannel.Request) {  

 try{      trace("Handling request: " + request.requestObj + " from client: " +

request.remoteAddress)      request.requestId match {        case

RequestKeys.ProduceKey => handleProducerRequest(request)  // producer        

case RequestKeys.FetchKey => handleFetchRequest(request)       // consumer  

      case RequestKeys.OffsetsKey => handleOffsetRequest(request)        case

RequestKeys.MetadataKey => handleTopicMetadataRequest(request)        case

RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

//成为leader或follower设置同步副本组信息         case RequestKeys.StopReplicaKey =>

handleStopReplicaRequest(request)        case RequestKeys.UpdateMetadataKey

=> handleUpdateMetadataRequest(request)        case

RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)

 //shutdown broker         case RequestKeys.OffsetCommitKey =>

handleOffsetCommitRequest(request)        case RequestKeys.OffsetFetchKey =>

handleOffsetFetchRequest(request)        case requestId => throw new

KafkaException("Unknown api code " + requestId)      }    } catch {      case e:

Throwable =>        request.requestObj.handleError(e, requestChannel,

request)        error("error when handling request

%s".format(request.requestObj), e)    } finally    

 request.apiLocalCompleteTimeMs = SystemTime.milliseconds  }

这儿面的每个恳求在上面给出的连接的文章中都有过解说阐明,在这儿不多解说。

RequestKeys.LeaderAndIsr详细分析

在上面的代码中我们看到ReequestKeys.LeaderAndlst对应的办法本来是KeyhandleLeaderAndIsrRequest。

def handleLeaderAndIsrRequest(request:

RequestChannel.Request) {    // ensureTopicExists is only for client facing

requests     // We can't have the ensureTopicExists check here since the

controller sends it as an advisory to all brokers so they     // stop serving

data to clients for the topic being deleted     val leaderAndIsrRequest =

request.requestObj.asInstanceOf[LeaderAndIsrRequest]    try {      val

(response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest,

offsetManager)      val leaderAndIsrResponse = new

LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)    

 requestChannel.sendResponse(new Response(request, new

BoundedByteBufferSend(leaderAndIsrResponse)))    } catch {      case e:

KafkaStorageException =>        fatal("Disk error during leadership change.",

e)        Runtime.getRuntime.halt(1)    }  }

将request.requestObj变换成LeaderAndIstRequest目标类型。

Sample Flowchart Template.png

流程图阐明

1、假如恳求中controllerEpoch小于当时最新的controllerEpoch,则直接回来ErrorMapping.StaleControllerEpochCode。

2、假如partitionStateInfo中的leader

epoch大于当时ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

2.1、假如当时brokerid(或许说replica

id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中。

不然阐明该Broker不在该Partition分配的Replica

list中,将该信息记载于log中

3、假如partitionStateInfo中的leader

epoch小于当时ReplicManager则将相应的Error

code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

4、筛选出partitionState中Leader与当时Broker

ID持平的一切记载存入partitionsTobeLeader中,其它记载存入partitionsToBeFollower中。

假如partitionsTobeLeader不为空,则对其履行makeLeaders方。

假如partitionsToBeFollower不为空,则对其履行makeFollowers办法。

优质IT资源分享社区为你提供此文。

本站有大量优质Java教程视频,资料等资源,包含java基础教程,高级进阶教程等等,教程视频资源涵盖传智播客,极客学院,达内,北大青鸟,猎豹网校等等IT职业培训机构的培训教学视频,价值巨大。欢迎点击下方链接查看。

java教程视频

优质IT资源分享社区(www.itziyuan.top)
一个免费,自由,开放,共享,平等,互助的优质IT资源分享网站。
专注免费分享各大IT培训机构最新培训教学视频,为你的IT学习助力!

!!!回帖受限制请看点击这里!!!
!!!资源失效请在此版块发帖说明!!!

[PS:按 CTRL+D收藏本站网址~]

——“优质IT资源分享社区”管理员专用签名~

本版相似帖子

游客