Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

本节讲解Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?数据峰值及流量变化的不稳定有2个层面:1)第一个层面就是数据确实不稳定,例如晚上11点的时候访问流量特别高,相对其他时间而言表现为不稳定。2)第二个层面:数据是没问题的,数据流动的速度是匀速或接近于匀速,但是在处理的过程中发生了故障或者GC的时候耽误了时间,导致计算延迟,Web监控台上也会看到峰值的变化。这两种情况无论哪种情况,都不是一个稳定的系统能够避免的问题,解决这个问题有很多方式,本节提一个Spark本身自带的一个Backpressure机制,即数据的反压机制。

Backpressure机制的基本思想是根据上一次计算的Job信息来评估决定下一个Job数据接受的速度。根据上一个Job执行结束之后对Job的统计信息,例如Job延迟了多长时间等,基于这个统计信息,SparkStreaming的算法会评估下一个Batch Duration的时间窗口应该以什么样的速度接收数据。这里有一个问题,接收数据的速度怎么进行限制?接收数据的过程也是消费数据的过程,SparkStreaming在接收数据的时候必须把当前的数据接收完毕,才能接收下一条的数据,根据上一个作业执行的情况,根据自己的算法进行一个判断,下一个BatchDuration必须以什么样的速度接收数据,这就是所谓的Backpressure反压机制。本章节将透彻的讲解Backpressure反压机制如何实现。

在生产环境下,Spark Streaming有一个监控的平台,最简单的是使用SparkWeb View的原生页面,或者借助第三方的监控软件去监控,如监测到Spark Streaming以匀速的速度前进,表明SparkStreaming的处理是稳定的。如果在监控平台上监控Kafka剩余数据的情况,Kafka突然出现一个波峰,这个时候需考虑怎么去解决,如果SparkStreaming开启了Backpressure的机制,数据突然变大或者发生了GC,Backpressure能够动态改变消费的速度,对于Spark而言是非常有意义的。在过去的很多项目中,我们都会开启SparkStreaming的Backpressure反压机制。

从源码的角度,Backpressure反压机制从Driver来考虑,我们系统的看一下反压机制的内容。RateController继承至StreamingListener监听器,当一个Job完成的时候,StreamingListener是监听器的一种,肯定会进行注册,注册的时候会调用RateController。RateController有自己的子类,会调自己具体的业务。其中computeAndPublish方法计算新的限制速率并异步发布。

RateController.scala源代码:

1.         private[streaming] abstractclass RateController(val streamUID: Int, rateEstimator:

2.         RateEstimator) extendsStreamingListener with Serializable {

3.         ……

4.           private def computeAndPublish(time: Long,elems: Long, workDelay: Long, waitDelay: Long): Unit =

5.             Future[Unit] {

6.               val newRate = rateEstimator.compute(time,elems, workDelay, waitDelay)

7.               newRate.foreach { s =>

8.                 rateLimit.set(s.toLong)

9.                 publish(getLatestRate())

10.            }

11.          },

 



我们先看一下RateController在什么时候启动和注册的?从原理的角度考虑什么时候启动RateController?RateController是在JobScheduler中启动的,为什么是在JobScheduler中,原因非常简单,JobScheduler知道作业什么时候完成,作业完成之后获得上一次作业的统计信息。

在Jobscheduler中查找RateController的实例化,每次作业完成的时候,Jobscheduler知道。这是Driver级别的,从start方法开始查找,从getInputStreams中把InputStreams关联上RateController,InputStreams有RateController。

JobScheduler.scala的start方法源代码:

1.          def start(): Unit = synchronized {

2.             if (eventLoop != null) return // schedulerhas already been started

3.          

4.             logDebug("Starting JobScheduler")

5.             eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler") {

6.               override protected def onReceive(event:JobSchedulerEvent): Unit = processEvent(event)

7.          

8.               override protected def onError(e:Throwable): Unit = reportError("Error in job scheduler", e)

9.             }

10.          eventLoop.start()

11.       

12.          // attach rate controllers of input streamsto receive batch completion updates

13.          for {

14.            inputDStream <-ssc.graph.getInputStreams

15.            rateController <-inputDStream.rateController

16.          } ssc.addStreamingListener(rateController)

17.       

18.          listenerBus.start()

19.          receiverTracker = new ReceiverTracker(ssc)

20.          inputInfoTracker = newInputInfoTracker(ssc)

21.       

22.          val executorAllocClient:ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {

23.            case b: ExecutorAllocationClient =>b.asInstanceOf[ExecutorAllocationClient]

24.            case _ => null

25.          }

26.       

27.          executorAllocationManager =ExecutorAllocationManager.createIfEnabled(

28.            executorAllocClient,

29.            receiverTracker,

30.            ssc.conf,

31.            ssc.graph.batchDuration.milliseconds,

32.            clock)

33.          executorAllocationManager.foreach(ssc.addStreamingListener)

34.          receiverTracker.start()

35.          jobGenerator.start()

36.          executorAllocationManager.foreach(_.start())

37.          logInfo("Started JobScheduler")

38.        }



我们先看一下InputStreams怎么来的,先看一下InputStreamsSuite的测试代码,通过ssc.socketTextStream创建一个networkStream:

InputStreamsSuite.scala源代码:

1.          val networkStream = ssc.socketTextStream(

2.                   "localhost",testServer.port, StorageLevel.MEMORY_AND_DISK)

 

ssc.socketTextStream创建的是socketTextStream。

StreamingContext.scala的socketTextStream源代码:

1.            defsocketTextStream(

2.               hostname: String,

3.               port: Int,

4.               storageLevel: StorageLevel =StorageLevel.MEMORY_AND_DISK_SER_2

5.             ): ReceiverInputDStream[String] =withNamedScope("socket text stream") {

6.             socketStream[String](hostname, port,SocketReceiver.bytesToLines, storageLevel)

7.           }

 

   其中socketStream的源代码如下。

   StreamingContext.scala的socketStream源代码:

1.            defsocketStream[T: ClassTag](

2.               hostname: String,

3.               port: Int,

4.               converter: (InputStream) =>Iterator[T],

5.               storageLevel: StorageLevel

6.             ): ReceiverInputDStream[T] = {

7.             new SocketInputDStream[T](this, hostname,port, converter, storageLevel)

8.           }

 

这里创建出SocketInputDStream,就是InputStreams的来源。SocketInputDStream中有一个getReceiver,在getReceiver方法中创建SocketReceiver。

SocketInputDStream.scala源代码:

1.          private[streaming]

2.         class SocketInputDStream[T:ClassTag](

3.             _ssc: StreamingContext,

4.             host: String,

5.             port: Int,

6.             bytesToObjects: InputStream =>Iterator[T],

7.             storageLevel: StorageLevel

8.           ) extends ReceiverInputDStream[T](_ssc) {

9.          

10.        def getReceiver(): Receiver[T] = {

11.          new SocketReceiver(host, port,bytesToObjects, storageLevel)

12.        }

13.      }

 

SocketReceiver的onStart方法会一直不断的循环,循环进行 receive()。

SocketInputDStream.scala的SocketReceiver源代码:

1.          private[streaming]

2.         class SocketReceiver[T:ClassTag](

3.             host: String,

4.             port: Int,

5.             bytesToObjects: InputStream =>Iterator[T],

6.             storageLevel: StorageLevel

7.           ) extends Receiver[T](storageLevel) withLogging {

8.          

9.           private var socket: Socket = _

10.       

11.        def onStart() {

12.       

13.          logInfo(s"Connecting to$host:$port")

14.          try {

15.            socket = new Socket(host, port)

16.          } catch {

17.            case e: ConnectException =>

18.              restart(s"Error connecting to$host:$port", e)

19.              return

20.          }

21.          logInfo(s"Connected to$host:$port")

22.       

23.          // Start the thread that receives data overa connection

24.          new Thread("Socket Receiver") {

25.            setDaemon(true)

26.            override def run() { receive() }

27.          }.start()

28.        }

 

receive方法中socket.getInputStream()是接收到的数据,放入到iterator中,只要iterator不停止就会一直循环,将数据存储起来,数据存储不是一件简单的事情。注意:receive方法运行在Executor上。

SocketInputDStream.scala的receive方法源代码:

1.            defreceive() {

2.             try {

3.               val iterator =bytesToObjects(socket.getInputStream())

4.               while(!isStopped &&iterator.hasNext) {

5.                 store(iterator.next())

6.               }

7.               if (!isStopped()) {

8.                 restart("Socket data stream had nomore data")

9.               } else {

10.              logInfo("Stopped receiving")

11.            }

12.          } catch {

13.            case NonFatal(e) =>

14.              logWarning("Error receivingdata", e)

15.              restart("Error receivingdata", e)

16.          } finally {

17.            onStop()

18.          }

19.        }

 

 Receiver的store方法将单个接收到的数据存储到Spark内存中,这些单个数据在被放入Spark内存前将被合并到数据块。

 Receiver.scala的源代码:

1.            defstore(dataItem: T) {

2.             supervisor.pushSingle(dataItem)

3.           }

 

ReceiverSupervisor的pushSingle方法将单个数据项推到后端数据存储,这里无具体实现,需看ReceiverSupervisor具体子类ReceiverSupervisorImpl的实现。

ReceiverSupervisor.scala源代码:

1.          def pushSingle(data: Any): Unit

 

子类ReceiverSupervisorImpl的pushSingle方法将一条记录放入defaultBlockGenerator中。

ReceiverSupervisorImpl.scala的源代码:

1.            defpushSingle(data: Any) {

2.             defaultBlockGenerator.addData(data)

3.           }

 

defaultBlockGenerator的addData方法将单个数据项推入缓冲区。addData中有个关键的方法 waitToPush(), waitToPush()是关键点。

BlockGenerator.scala源代码:

1.          def addData(data: Any): Unit = {

2.             if (state == Active) {

3.               waitToPush()

4.               synchronized {

5.                 if (state == Active) {

6.                   currentBuffer += data

7.                 } else {

8.                   throw new SparkException(

9.                     "Cannot add data asBlockGenerator has not been started or has been stopped")

10.              }

11.            }

12.          } else {

13.            throw new SparkException(

14.              "Cannot add data as BlockGeneratorhas not been started or has been stopped")

15.          }

16.        }

 

waitToPush方法里面调用rateLimiter.acquire()方法,如果addData的waitToPush方法不执行,则addData方法中在waitToPush方法之后的synchronized同步块代码将都不执行,数据不能存储。

RateLimiter.scala源代码:

1.           private[receiver] abstract classRateLimiter(conf: SparkConf) extends Logging {

2.          

3.           // treated as an upper limit

4.           private val maxRateLimit =conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)

5.           private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)

6.          

7.           def waitToPush() {

8.             rateLimiter.acquire()

9.           }



RateLimiter.java位于com.google.common.util.concurrent包中,acquire方法从{@code RateLimiter}申请获取一个许可证,一直阻塞直到请求被授予。 其中调用acquire(1)方法,acquire(1)方法中reserve有个同步互斥信号量synchronized (mutex()),如果接收数据进行存储,需拿到一个许可证Ticket,如果没有这个令牌,就无法存储数据,也就限定了接收数据的速度。在receiver中有具体限定接收数据的方式。

RateLimiter.java的源代码:

1.         public void acquire() {

2.                 acquire(1);

3.             }

4.         ......

5.             public void acquire(int permits) {

6.                 checkPermits(permits);

7.                 long microsToWait;

8.                 synchronized (mutex) {

9.                     microsToWait =reserveNextTicket(permits, readSafeMicros());

10.              }

11.              ticker.sleepMicrosUninterruptibly(microsToWait);

12.          }

 



回到Driver端的JobScheduler,JobScheduler在Start的时候,每个inputDStream都有一个rateController,for循环遍历获得rateController,然后将rateController交给上下文的ssc.addStreamingListener监听器,进行注册。这里listenerBus是scheduler级别的listenerBus,listenerBus收到相关的信息肯定会告诉监听器。

 StreamingContext.scala的addStreamingListener源代码:

1.            defaddStreamingListener(streamingListener: StreamingListener) {

2.             scheduler.listenerBus.addListener(streamingListener)

3.           }

4.         ……

5.         private[spark] val listeners =new CopyOnWriteArrayList[L]

6.         …….

7.           final def addListener(listener: L): Unit = {

8.             listeners.add(listener)

9.           }

 

其中listenerBus是StreamingListenerBus,StreamingListenerBus在什么时候启动的?在构建JobScheduler的时候会获取StreamingListenerBus的实例。

JobScheduler.scala源代码:

1.         val listenerBus = newStreamingListenerBus(ssc.sparkContext.listenerBus)

 

StreamingListenerBus中有很多事件,包括receiverStarted、receiverError、receiverStopped等事件。其中batchStarted是batch处理开始,batchSubmitted是batch提交,而batchCompleted比较关键。

StreamingListenerBus.scala源代码:

1.          private[streaming] classStreamingListenerBus(sparkListenerBus: LiveListenerBus)

2.           extends SparkListener withListenerBus[StreamingListener, StreamingListenerEvent] {

3.         ......

4.         protected override defdoPostEvent(

5.               listener: StreamingListener,

6.               event: StreamingListenerEvent): Unit = {

7.             event match {

8.               case receiverStarted:StreamingListenerReceiverStarted =>

9.                 listener.onReceiverStarted(receiverStarted)

10.            case receiverError:StreamingListenerReceiverError =>

11.              listener.onReceiverError(receiverError)

12.            case receiverStopped:StreamingListenerReceiverStopped =>

13.              listener.onReceiverStopped(receiverStopped)

14.            case batchSubmitted:StreamingListenerBatchSubmitted =>

15.              listener.onBatchSubmitted(batchSubmitted)

16.            case batchStarted:StreamingListenerBatchStarted =>

17.              listener.onBatchStarted(batchStarted)

18.            case batchCompleted: StreamingListenerBatchCompleted=>

19.              listener.onBatchCompleted(batchCompleted)

20.            case outputOperationStarted:StreamingListenerOutputOperationStarted =>

21.              listener.onOutputOperationStarted(outputOperationStarted)

22.            case outputOperationCompleted: StreamingListenerOutputOperationCompleted=>

23.              listener.onOutputOperationCompleted(outputOperationCompleted)

24.            case streamingStarted:StreamingListenerStreamingStarted =>

25.              listener.onStreamingStarted(streamingStarted)

26.            case _ =>

27.          }

28.        }

 


batchCompleted的时候,从StreamingListener的角度讲,StreamingListener是一个trait,onBatchCompleted方法中无具体实现。具体方法在子类实现,StreamingListener有很多具体的子类,其中一个子类是RateController。

StreamingListener.scala源代码:

1.         trait StreamingListener {

2.         ……

3.         defonBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

 



子类RateController会调用onBatchCompleted方法,onBatchCompleted方法是个关键点。获取batchInfo的streamIdToInputInfo作为elements,for循环遍历获取workDelay、waitDelay等信息,这些信息对于第三方的监控也是非常重要的,然后通过computeAndPublish方法发布信息。在computeAndPublish方法中,rateEstimator根据传入的时间、elems、workDelay、waitDelay等信息评估新的更加合适的Rate,在newRate.foreach循环遍历中,将值赋值到rateLimit,然后publish进行发布,这里publish方法无具体实现,RateController的子类是ReceiverRateController,其publish方法将新的速率交给receiverTracker。

RateController.scala源代码:

1.            override def onBatchCompleted(batchCompleted:StreamingListenerBatchCompleted) {

2.             val elements =batchCompleted.batchInfo.streamIdToInputInfo

3.          

4.             for {

5.               processingEnd <-batchCompleted.batchInfo.processingEndTime

6.               workDelay <-batchCompleted.batchInfo.processingDelay

7.               waitDelay <- batchCompleted.batchInfo.schedulingDelay

8.               elems <-elements.get(streamUID).map(_.numRecords)

9.             } computeAndPublish(processingEnd, elems,workDelay, waitDelay)

10.        }

11.      }

12.      ……

13.      protected def publish(rate:Long): Unit

 

在ReceiverInputDStream子类中,新的控制速率交给receiverTracker,然后receiverTracker调用sendRateUpdate方法。因为有很多RateController,因此这里会设置一个具体的ID。

ReceiverInputDStream.scala源代码

1.           private[streaming] classReceiverRateController(id: Int, estimator: RateEstimator)

2.               extends RateController(id, estimator) {

3.             override def publish(rate: Long): Unit =

4.               ssc.scheduler.receiverTracker.sendRateUpdate(id,rate)

5.           }

 

在sendRateUpdate方法中,将ID传进去,这里变成了streamUID,每个Stream会attach一个RateController,然后endpoint调用send方法发送UpdateReceiverRateLimit(streamUID,newRate)信息。

ReceiverTracker.scala源代码:

1.            defsendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {

2.             if (isTrackerStarted) {

3.               endpoint.send(UpdateReceiverRateLimit(streamUID,newRate))

4.             }

5.           }



endpoint.send是RPC的通信机制,endpoint在哪里实例化的?在start方法中通过ssc.env.rpcEnv.setupEndpoint方法创建endpoint实例,创建一个ReceiverTrackerEndpoint。然后在ReceiverTrackerEndpoint中接收UpdateReceiverRateLimit消息 ,从receiverTrackingInfos中根据streamUID获取info的相关信息,然后基于info获取endpoint 的通信句柄,endpoint就是ReceiverTrackingInfo的Rpc通信实体。

ReceiverTracker.scala源代码:

1.          private var endpoint: RpcEndpointRef = null

2.         ……

3.         def start(): Unit =synchronized {

4.             if (isTrackerStarted) {

5.               throw newSparkException("ReceiverTracker already started")

6.             }

7.          

8.             if (!receiverInputStreams.isEmpty) {

9.               endpoint = ssc.env.rpcEnv.setupEndpoint(

10.              "ReceiverTracker", newReceiverTrackerEndpoint(ssc.env.rpcEnv))

11.            if (!skipReceiverLaunch)launchReceivers()

12.            logInfo("ReceiverTrackerstarted")

13.            trackerState = Started

14.          }

15.        }

16.      ……

17.      private classReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extendsThreadSafeRpcEndpoint {

18.      ......

19.         case UpdateReceiverRateLimit(streamUID,newRate) =>

20.              for (info <-receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {

21.                eP.send(UpdateRateLimit(newRate))

22.              }

 

ReceiverTracker.scala在HashMap数据结构中根据ID获取ReceiverTrackingInfo信息。receiverTrackingInfos跟踪所有接收者的信息,关键是接收者ID,其值是接收者的信息,只能被ReceiverTrackerEndpoint访问。

ReceiverTracker.scala

1.           privateval receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

 

ReceiverTrackingInfo的成员变量中有一个endpoint,是Option[RpcEndpointRef]类型。

ReceiverTrackingInfo.scala源代码:

2.          private[streaming] case classReceiverTrackingInfo(

3.             receiverId: Int,

4.             state: ReceiverState,

5.             scheduledLocations:Option[Seq[TaskLocation]],

6.             runningExecutor: Option[ExecutorCacheTaskLocation],

7.             name: Option[String] = None,

8.             endpoint: Option[RpcEndpointRef] = None,

9.             errorInfo: Option[ReceiverErrorInfo] =None) {

 

在ReceiverSupervisorImpl.scala中查找一下内部的RPC,这里有endpoint,endpoint是注册上去的,RpcEndpointRef从Driver的ReceiverTracker 接收消息,这里包括了UpdateRateLimit消息,接收到UpdateRateLimit消息以后,在registeredBlockGenerators调用updateRate方法。

ReceiverSupervisorImpl.scala

1.         private[streaming] class ReceiverSupervisorImpl(

2.             receiver: Receiver[_],

3.             env: SparkEnv,

4.             hadoopConf: Configuration,

5.             checkpointDirOption: Option[String]

6.           ) extends ReceiverSupervisor(receiver,env.conf) with Logging {

7.         …….

8.          private val endpoint =env.rpcEnv.setupEndpoint(

9.             "Receiver-" + streamId +"-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {

10.            override val rpcEnv: RpcEnv = env.rpcEnv

11.       

12.            override def receive:PartialFunction[Any, Unit] = {

13.              case StopReceiver =>

14.                logInfo("Received stopsignal")

15.                ReceiverSupervisorImpl.this.stop("Stoppedby driver", None)

16.              case CleanupOldBlocks(threshTime) =>

17.                logDebug("Received delete oldbatch signal")

18.                cleanupOldBlocks(threshTime)

19.              case UpdateRateLimit(eps) =>

20.                logInfo(s"Received a new ratelimit: $eps.")

21.                registeredBlockGenerators.asScala.foreach{ bg =>

22.                  bg.updateRate(eps)

23.                }

24.            }

25.          })

 

updateRate方法中会设置新的接收速率,但不会超过最大速率spark.streaming.receiver.maxRate。启动Spark Streaming的Backpressure机制,要设置最大速率spark.streaming.receiver.maxRate ,这是安全保障机制,因为有时集群处理确实有限,Backpressure反压机制可以让接收速度获得极大的提升,但是要量力而行;另外一方面,Spark Streaming运行在Yarn上,资源可能有限定,不是指物理资源的限定,而是指Spark Streaming可能只能用50%的资源,所以这里设置maxRate,获得新的速率newRate以后,在rateLimiter进行设置。如果newRate大于0,maxRateLimit大于0,取newRate及maxRateLimit的最小值作为新的速率。

RateLimiter.scala的源代码:

1.            private[receiver] def updateRate(newRate:Long): Unit =

2.             if (newRate > 0) {

3.               if (maxRateLimit > 0) {

4.                 rateLimiter.setRate(newRate.min(maxRateLimit))

5.               } else {

6.                 rateLimiter.setRate(newRate)

7.               }

8.             }

 

RateLimiter.java 的setRate方法中加了一个互斥信号锁,获取每秒钟能接收多少条记录及处理多少条记录,然后调用doSetRate方法根据已有的数据和最新的数据设置速率,doSetRate交给具体的子类实现。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         public final voidsetRate(double permitsPerSecond) {

2.                 Preconditions.checkArgument(permitsPerSecond> 0.0

3.                         &&!Double.isNaN(permitsPerSecond), "rate must be positive");

4.                 synchronized (mutex) {

5.                     resync(readSafeMicros());

6.                     double stableIntervalMicros =TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;

7.                     this.stableIntervalMicros =stableIntervalMicros;

8.                     doSetRate(permitsPerSecond,stableIntervalMicros);

9.                 }

10.          }

11.       

12.          abstract void doSetRate(doublepermitsPerSecond, double stableIntervalMicros);

 

doSetRate方法由具体的子类实现,Bursty类是RateLimiter的子类,com.google.common.util.concurrent.RateLimiter.java是Google提供的,源码在guava-14.0.1-sources中。在存储数据的时候,必须根据速率rate存储数据,这就是本章节谈的Backpressure机制。整体思路较简单,每次计算BatchDuration Job执行完成的时候,就有JobCompleted的统计信息发回来,根据统计信息计算新的Rate,将新的rate进行远程通信交给Executor,Executor根据接收到的信息重新设置Rate,每次接收数据的时候肯定根据Rate决定每秒接收多少数据。这样就动态改变了速度,每次计算上一个作业的执行情况。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         private static class Burstyextends RateLimiter {

2.                 Bursty(SleepingTicker ticker) {

3.                     super(ticker);

4.                 }

5.          

6.                 @Override

7.                 void doSetRate(double permitsPerSecond,double stableIntervalMicros) {

8.                     double oldMaxPermits =this.maxPermits;

9.                     /*

10.                   * We allow the equivalent work ofup to one second to be granted with zero waiting, if the

11.                   * rate limiter has been unused foras much. This is to avoid potentially producing tiny

12.                   * wait interval between subsequentrequests for sufficiently large rates, which would

13.                   * unnecessarily overconstrain thethread scheduler.

14.                   */

15.                  maxPermits = permitsPerSecond; //one second worth of permits

16.                  storedPermits = (oldMaxPermits ==0.0)

17.                          ? 0.0 // initial state

18.                          : storedPermits *maxPermits / oldMaxPermits;

19.              }

20.       

21.              @Override

22.              long storedPermitsToWaitTime(doublestoredPermits, double permitsToTake) {

23.                  return 0L;

24.              }

25.          }

 




Read more

60个“特征工程”计算函数(Python代码)

60个“特征工程”计算函数(Python代码)

转自:coggle数据科学 近期一些朋友询问我关于如何做特征工程的问题,有没有什么适合初学者的有效操作。 特征工程的问题往往需要具体问题具体分析,当然也有一些暴力的策略,可以在竞赛初赛前期可以带来较大提升,而很多竞赛往往依赖这些信息就可以拿到非常好的效果,剩余的则需要结合业务逻辑以及很多其他的技巧,此处我们将平时用得最多的聚合操作罗列在下方。 最近刚好看到一篇文章汇总了非常多的聚合函数,就摘录在下方,供许多初入竞赛的朋友参考。 聚合特征汇总 pandas自带的聚合函数 * 其它重要聚合函数 其它重要聚合函数&分类分别如下。 def median(x):     return np.median(x) def variation_coefficient(x):     mean = np.mean(x)     if mean != 0:         return np.std(x) / mean     else:         return np.nan def variance(x):     return

By Ne0inhk
90w,确实可以封神了!

90w,确实可以封神了!

要说24年一定最热的技术,还得是AIGC! 前段时间阿里旗下的开源项目,登上GitHub热榜! AI大热,如今ChatGPT的优异表现,必然会出现各种细分场景应用的工具软件,和大量岗位项目! 山雨欲来风满楼,强人工智能的出现,所有科技公司已经开始巨量扩招此领域的人才。算法的岗位,近三个月已经增长68%!这件事在HR届也是相当震撼的。 目前各行各业都不景气的市场,人工智能岗位却一直保持常青!甚至同属AI边缘岗都比其他岗薪资高40%! 与此同时,AI算法岗上岸也不简单,竞争激烈,好公司核心岗位不用说,谁都想去。 所以事实就是,想要上岸,门槛也逐渐变高,项目经历、实习经历都很重要,越早明白这个道理就越能提前建立起自己的优势。 但我在b站逛知识区的时候,经常看到有些同学,因为一些客观原因导致无法参加实习,这种情况下,如果你想提升背景,增加项目经历的话,可以试试这个《CV/NLP 算法工程师培养计划》。 目前已经有上千位同学通过该计划拿到offer了,最新一期学员就业薪资最高能拿到78K!年薪94w! 优势就是有BAT大厂讲师带领,手把手带做AI真实企业项目(包含CV、NLP等

By Ne0inhk
再见nohup!试试这个神器,Python Supervisor!

再见nohup!试试这个神器,Python Supervisor!

👇我的小册 45章教程:() ,原价299,限时特价2杯咖啡,满100人涨10元。 作者丨Ais137 https://juejin.cn/post/7354406980784373798 1. 概述 Supervisor 是一个 C/S 架构的进程监控与管理工具,本文主要介绍其基本用法和部分高级特性,用于解决部署持久化进程的稳定性问题。 2. 问题场景 在实际的工作中,往往会有部署持久化进程的需求,比如接口服务进程,又或者是消费者进程等。这类进程通常是作为后台进程持久化运行的。 一般的部署方法是通过 nohup cmd & 命令来部署。但是这种方式有个弊端是在某些情况下无法保证目标进程的稳定性运行,有的时候 nohup 运行的后台任务会因为未知原因中断,从而导致服务或者消费中断,进而影响项目的正常运行。 为了解决上述问题,通过引入 Supervisor 来部署持久化进程,提高系统运行的稳定性。 3. Supervisor 简介 Supervisor is a client/

By Ne0inhk
第一本给程序员看的AI Agent图书上市了!

第一本给程序员看的AI Agent图书上市了!

AI Agent火爆到什么程度? OpenAI创始人奥特曼预测,未来各行各业,每一个人都可以拥有一个AI Agent;比尔·盖茨在2023年层预言:AI Agent将彻底改变人机交互方式,并颠覆整个软件行业;吴恩达教授在AI Ascent 2024演讲中高赞:AI Agent是一个令人兴奋的趋势,所有从事AI开发的人都应该关注。而国内的各科技巨头也纷纷布局AI Agent平台,如:钉钉的AI PaaS、百度智能云千帆大模型平台等等。 Agent 是未来最重要的智能化工具。对于程序员来说,是时候将目光转向大模型的应用开发了,率先抢占AI的下一个风口AI Agent。 小异带来一本新书《大模型应用开发 动手做 AI Agent》,这本书由《GPT图解》的作者黄佳老师创作,从0到1手把手教你做AI Agent。现在下单享受5折特惠! ▼点击下方,即可5折起购书 有这样一本秘籍在手,程序员们这下放心了吧,让我们先来揭开 Agent 的神秘面纱。 AI Agent 面面观

By Ne0inhk