第三步:kafka的server启动过程 源代码运行内幕机制

第三步:kafka的server启动过程  源代码运行内幕机制

kafka分布式集群启动比spark分布式集群启动相对简化的思考,
1、spark分布式集群节点的启动,只在spark master节点执行start-all.sh,master节点会自动ssh到分布式的其他worker节点,将cmd命令发送过去,在worker节点jvm中自动就加载了main类来启动,如CorseGrainExecutorBackend,从而完成了master与worker的通信。

2、kafka就不一样,kafka在每台broker节点上都需手工启动kafka-server-start.sh,这样就简化了许多,在每个broker节点上就可以通过socket来通信了。

这么理解吧:
1、spark相当于一个小区的一个基站,基站起来了,分布式节点(手机)就自动带起来了,手机可以上网、打电话
2、kafka相当于固定电话,要每户每户进行安装,每一户人家(分布式broker)人工安装好了,就可以打电话了。电话调度executor使用FIFO,先来的电话先通,后来的电话就等待。

以上个人想的,可能描述不一定准确。

1、 kafka.kafka ->main ->kafkaServerStartable.startup

2、进入 kafkaServerStartable.scala -〉def startup() -〉kafka.server.startup()

3、进入kafka.server.scala: canStartup启动

if (canStartup) {
        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime,

true)

brokerState.newState(Starting)

/* start scheduler */ //启动kafka调度器
        kafkaScheduler.startup()

/* setup zookeeper */ //初始化zookeeper
        zkUtils = initZk()

/* start log manager */ //启动日志管理
        logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup()

/* generate brokerId */ //生成 brokerId
        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "

socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()  //socketServer启动

/* start replica manager */ 启动副本管理
        replicaManager = new ReplicaManager(config, metrics, time,

kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
          isShuttingDown)
        replicaManager.startup()

/* start kafka controller */ //启动kafka控制器
        kafkaController = new KafkaController(config, zkUtils, brokerState,

kafkaMetricsTime, metrics, threadNamePrefix)
        kafkaController.startup()

/* start kafka coordinator */ //启动kafka协调器
        consumerCoordinator = GroupCoordinator.create(config, zkUtils,

replicaManager)
        consumerCoordinator.startup()

/* Get the authorizer and initialize it if one is specified.*///启动

安全认证
        authorizer = Option(config.authorizerClassName).filter

(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

/* start processing requests */ //启动kakfa的请求线程池
        apis = new KafkaApis(socketServer.requestChannel, replicaManager,

consumerCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache,

metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,

socketServer.requestChannel, apis, config.numIoThreads)
        brokerState.newState(RunningAsBroker)

Mx4jLoader.maybeLoad()

/* start dynamic config manager */ //kafka动态配置管理
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic

-> new TopicConfigHandler(logManager),
                                                           ConfigType.Client

-> new ClientIdConfigHandler(apis.quotaManagers))

// Apply all existing client configs to the ClientIdConfigHandler to

bootstrap the overrides
        // TODO: Move this logic to DynamicConfigManager
        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach

{
          case (clientId, properties) => dynamicConfigHandlers

(ConfigType.Client).processConfigChanges(clientId, properties)
        }

// Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils,

dynamicConfigHandlers)
        dynamicConfigManager.startup()

/* tell everyone we are alive */ // endpoint节点监听
        val listeners = config.advertisedListeners.map {case(protocol,

endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort

(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,

zkUtils)
        kafkaHealthcheck.startup()

/* register broker metrics */ 监控
        registerStats()

shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }

4、broker的状态探秘:
 brokerState.newState(Starting)->trait BrokerStates
 进入trait BrokerStates.scala

**
 * Broker states are the possible state that a kafka broker can be in.
 * A broker should be only in one state at a time.
 * The expected state transition with the following defined states is:
 *
 *                +-----------+
 *                |Not Running|
 *                +-----+-----+
 *                      |
 *                      v
 *                +-----+-----+
 *                |Starting   +--+
 *                +-----+-----+  | +----+------------+
 *                      |        +>+RecoveringFrom   |
 *                      v          |UncleanShutdown  |
 * +----------+     +-----+-----+  +-------+---------+
 * |RunningAs |     |RunningAs  |            |
 * |Controller+<--->+Broker     +<-----------+
 * +----------+     +-----+-----+
 *        |              |
 *        |              v
 *        |       +-----+------------+
 *        |-----> |PendingControlled |
 *                |Shutdown          |
 *                +-----+------------+
 *                      |
 *                      v
 *               +-----+----------+
 *               |BrokerShutting  |
 *               |Down            |
 *               +-----+----------+
 *                     |
 *                     v
 *               +-----+-----+
 *               |Not Running|
 *               +-----------+
 *


case object NotRunning extends BrokerStates { val state: Byte = 0 }
case object Starting extends BrokerStates { val state: Byte = 1 }
case object RecoveringFromUncleanShutdown extends BrokerStates { val state:

Byte = 2 }
case object RunningAsBroker extends BrokerStates { val state: Byte = 3 }
case object RunningAsController extends BrokerStates { val state: Byte = 4 }
case object PendingControlledShutdown extends BrokerStates { val state: Byte

= 6 }
case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }


5、kafkaScheduler调度器
kafkaScheduler.startup()->kafkaScheduler.scala
进入kafkaScheduler.scala

override def startup() {
    debug("Initializing task scheduler.")
    this synchronized {
      if(isStarted)
        throw new IllegalStateException("This scheduler has already been started!")
      executor = new ScheduledThreadPoolExecutor(threads)
      executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
      executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
      executor.setThreadFactory(new ThreadFactory() {
                                  def newThread(runnable: Runnable): Thread =
                                    Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
                                })
    }
  }

www.zeeklog.com  - 第三步:kafka的server启动过程  源代码运行内幕机制

Read more

60个“特征工程”计算函数(Python代码)

60个“特征工程”计算函数(Python代码)

转自:coggle数据科学 近期一些朋友询问我关于如何做特征工程的问题,有没有什么适合初学者的有效操作。 特征工程的问题往往需要具体问题具体分析,当然也有一些暴力的策略,可以在竞赛初赛前期可以带来较大提升,而很多竞赛往往依赖这些信息就可以拿到非常好的效果,剩余的则需要结合业务逻辑以及很多其他的技巧,此处我们将平时用得最多的聚合操作罗列在下方。 最近刚好看到一篇文章汇总了非常多的聚合函数,就摘录在下方,供许多初入竞赛的朋友参考。 聚合特征汇总 pandas自带的聚合函数 * 其它重要聚合函数 其它重要聚合函数&分类分别如下。 def median(x):     return np.median(x) def variation_coefficient(x):     mean = np.mean(x)     if mean != 0:         return np.std(x) / mean     else:         return np.nan def variance(x):     return

By Ne0inhk
90w,确实可以封神了!

90w,确实可以封神了!

要说24年一定最热的技术,还得是AIGC! 前段时间阿里旗下的开源项目,登上GitHub热榜! AI大热,如今ChatGPT的优异表现,必然会出现各种细分场景应用的工具软件,和大量岗位项目! 山雨欲来风满楼,强人工智能的出现,所有科技公司已经开始巨量扩招此领域的人才。算法的岗位,近三个月已经增长68%!这件事在HR届也是相当震撼的。 目前各行各业都不景气的市场,人工智能岗位却一直保持常青!甚至同属AI边缘岗都比其他岗薪资高40%! 与此同时,AI算法岗上岸也不简单,竞争激烈,好公司核心岗位不用说,谁都想去。 所以事实就是,想要上岸,门槛也逐渐变高,项目经历、实习经历都很重要,越早明白这个道理就越能提前建立起自己的优势。 但我在b站逛知识区的时候,经常看到有些同学,因为一些客观原因导致无法参加实习,这种情况下,如果你想提升背景,增加项目经历的话,可以试试这个《CV/NLP 算法工程师培养计划》。 目前已经有上千位同学通过该计划拿到offer了,最新一期学员就业薪资最高能拿到78K!年薪94w! 优势就是有BAT大厂讲师带领,手把手带做AI真实企业项目(包含CV、NLP等

By Ne0inhk
再见nohup!试试这个神器,Python Supervisor!

再见nohup!试试这个神器,Python Supervisor!

👇我的小册 45章教程:() ,原价299,限时特价2杯咖啡,满100人涨10元。 作者丨Ais137 https://juejin.cn/post/7354406980784373798 1. 概述 Supervisor 是一个 C/S 架构的进程监控与管理工具,本文主要介绍其基本用法和部分高级特性,用于解决部署持久化进程的稳定性问题。 2. 问题场景 在实际的工作中,往往会有部署持久化进程的需求,比如接口服务进程,又或者是消费者进程等。这类进程通常是作为后台进程持久化运行的。 一般的部署方法是通过 nohup cmd & 命令来部署。但是这种方式有个弊端是在某些情况下无法保证目标进程的稳定性运行,有的时候 nohup 运行的后台任务会因为未知原因中断,从而导致服务或者消费中断,进而影响项目的正常运行。 为了解决上述问题,通过引入 Supervisor 来部署持久化进程,提高系统运行的稳定性。 3. Supervisor 简介 Supervisor is a client/

By Ne0inhk
第一本给程序员看的AI Agent图书上市了!

第一本给程序员看的AI Agent图书上市了!

AI Agent火爆到什么程度? OpenAI创始人奥特曼预测,未来各行各业,每一个人都可以拥有一个AI Agent;比尔·盖茨在2023年层预言:AI Agent将彻底改变人机交互方式,并颠覆整个软件行业;吴恩达教授在AI Ascent 2024演讲中高赞:AI Agent是一个令人兴奋的趋势,所有从事AI开发的人都应该关注。而国内的各科技巨头也纷纷布局AI Agent平台,如:钉钉的AI PaaS、百度智能云千帆大模型平台等等。 Agent 是未来最重要的智能化工具。对于程序员来说,是时候将目光转向大模型的应用开发了,率先抢占AI的下一个风口AI Agent。 小异带来一本新书《大模型应用开发 动手做 AI Agent》,这本书由《GPT图解》的作者黄佳老师创作,从0到1手把手教你做AI Agent。现在下单享受5折特惠! ▼点击下方,即可5折起购书 有这样一本秘籍在手,程序员们这下放心了吧,让我们先来揭开 Agent 的神秘面纱。 AI Agent 面面观

By Ne0inhk