kafka学习笔记(四)kafka的日志模块伯安知心

日志段源码位于Kafka的core工程下,具体文件位置是core/src/main/scala/kafka/log/LogSegment.scala。实际上,所有日志结构部分的源码都在core的kafka.log包下。该文件下定义了三个Scala对象:LogSegmentclass;LogSegmentobject;LogFlushStatsobject。LogFlushStats结尾有个Stats,它是做统计用的,主要负责为日志落盘进行计时。每个日志段由两个核心组件构成:日志和索引。当然,这里的索引泛指广义的索引文件。另外,这段注释还给出了一个重要的事实:每个日志段都有一个起始位移值(BaseOffset),而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大。

下面,我分批次给出比较关键的代码片段,并对其进行解释。首先,我们看下LogSegment的定义:

每个日志段对象保存自己的起始位移baseOffset——这是非常重要的属性!事实上,你在磁盘上看到的文件名就是baseOffset的值。每个LogSegment对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。

对于一个日志段而言,最重要的方法就是写入消息和读取消息了,它们分别对应着源码中的append方法和read方法。另外,recover方法同样很关键,它是Broker重启后恢复日志段的操作逻辑。

第二步:代码调用ensureOffsetInRange方法确保输入参数最大位移值是合法的。那怎么判断是不是合法呢?标准就是看它与日志段起始位移的差值是否在整数范围内,即largestOffset-baseOffset的值是不是介于[0,Int.MAXVALUE]之间。在极个别的情况下,这个差值可能会越界,这时,append方法就会抛出异常,阻止后续的消息写入。一旦你碰到这个问题,你需要做的是升级你的Kafka版本,因为这是由已知的Bug导致的。

第三步:待这些做完之后,append方法调用FileRecords的append方法执行真正的写入。前面说过了,专栏后面我们会详细介绍FileRecords类。这里你只需要知道它的工作是将内存中的消息对象写入到操作系统的页缓存就可以了。

第五步:append方法的最后一步就是更新索引项和写入的字节数了。我在前面说过,日志段每写入4KB数据就要写入一个索引项。当已写入字节数超过了4KB之后,append方法会调用索引对象的append方法新增索引项,同时清空已写入字节数,以备下次重新累积计算。

好了,append方法我就解释完了。下面我们来看read方法,了解下读取日志段的具体操作。

read方法接收4个输入参数。startOffset:要读取的第一条消息的位移;maxSize:能读取的最大字节数;maxPosition:能读到的最大文件位置;minOneMessage:是否允许在消息体过大时至少返回第一条消息。前3个参数的含义很好理解,我重点说下第4个。当这个参数为true时,即使出现消息体字节数超过了maxSize的情形,read方法依然能返回至少一条消息。引入这个参数主要是为了确保不出现消费饿死的情况。

逻辑很简单,我们一步步来看下。第一步是调用translateOffset方法定位要读取的起始文件位置(startPosition)。输入参数startOffset仅仅是位移值,Kafka需要根据索引信息找到对应的物理文件位置才能开始读取消息。待确定了读取起始位置,日志段代码需要根据这部分信息以及maxSize和maxPosition参数共同计算要读取的总字节数。举个例子,假设maxSize=100,maxPosition=300,startPosition=250,那么read方法只能读取50字节,因为maxPosition-startPosition=50。我们把它和maxSize参数相比较,其中的最小值就是最终能够读取的总字节数。最后一步是调用FileRecords的slice方法,从指定位置读取指定大小的消息集合。

recover开始时,代码依次调用索引对象的reset方法清空所有的索引文件,之后会开始遍历日志段中的所有消息集合或消息批次(RecordBatch)。对于读取到的每个消息集合,日志段必须要确保它们是合法的,这主要体现在两个方面:该集合中的消息必须要符合Kafka定义的二进制格式;该集合中最后一条消息的位移值不能越界,即它与日志段起始位移的差值必须是一个正整数值。

你可以认为,日志是日志段的容器,里面定义了很多管理日志段的操作。坦率地说,如果看Kafka源码却不看Log,就跟你买了这门课却不知道作者是谁一样。在我看来,Log对象是Kafka源码(特别是Broker端)最核心的部分,没有之一。

Log源码位于Kafkacore工程的log源码包下,文件名是Log.scala。总体上,该文件定义了10个类和对象,如下图所示:

图中括号里的C表示Class,O表示Object。还记得我在上节课提到过的伴生对象吗?没错,同时定义同名的Class和Object,就属于Scala中的伴生对象用法。我们先来看伴生对象,也就是LogAppendInfo、Log和RollParams。

Log(C):Log源码中最核心的代码。这里我先卖个关子,一会儿细聊。Log(O):同理,Log伴生类的工厂方法,定义了很多常量以及一些辅助方法。

RollParams(C):定义用于控制日志段是否切分(Roll)的数据结构。

RollParams(O):同理,RollParams伴生类的工厂方法。

除了这3组伴生对象之外,还有4类源码。LogMetricNames:定义了Log对象的监控指标。LogOffsetSnapshot:封装分区所有位移元数据的容器类。LogReadInfo:封装读取日志返回的数据及其元数据。CompletedTxn:记录已完成事务的元数据,主要用于构建事务索引。

下面,我会按照这些类和对象的重要程度,对它们一一进行拆解。首先,咱们先说说Log类及其伴生对象。考虑到伴生对象多用于保存静态变量和静态方法(比如静态工厂方法等),因此我们先看伴生对象(即LogObject)的实现。

1objectLog{2valLogFileSuffix=".log"3valIndexFileSuffix=".index"4valTimeIndexFileSuffix=".timeindex"5valProducerSnapshotFileSuffix=".snapshot"6valTxnIndexFileSuffix=".txnindex"7valDeletedFileSuffix=".deleted"8valCleanedFileSuffix=".cleaned"9valSwapFileSuffix=".swap"10valCleanShutdownFile=".kafka_cleanshutdown"11valDeleteDirSuffix="-delete"12valFutureDirSuffix="-future"13……14}这是LogObject定义的所有常量。如果有面试官问你Kafka中定义了多少种文件类型,你可以自豪地把这些说出来。耳熟能详的.log、.index、.timeindex和.txnindex我就不解释了,我们来了解下其他几种文件类型。

.snapshot是Kafka为幂等型或事务型Producer所做的快照文件。鉴于我们现在还处于阅读源码的初级阶段,事务或幂等部分的源码我就不详细展开讲了。

.deleted是删除日志段操作创建的文件。目前删除日志段文件是异步操作,Broker端把日志段文件从.log后缀修改为.deleted后缀。如果你看到一大堆.deleted后缀的文件名,别慌,这是Kafka在执行日志段文件删除。

.cleaned和.swap都是Compaction操作的产物,等我们讲到Cleaner的时候再说。

-delete则是应用于文件夹的。当你删除一个主题的时候,主题的分区文件夹会被加上这个后缀。

-future是用于变更主题分区文件夹地址的,属于比较高阶的用法。

总之,记住这些常量吧。记住它们的主要作用是,以后不要被面试官唬住!开玩笑,其实这些常量最重要的地方就在于,它们能够让你了解Kafka定义的各种文件类型。LogObject还定义了超多的工具类方法。由于它们都很简单,这里我只给出一个方法的源码,我们一起读一下。

1deffilenamePrefixFromOffset(offset:Long):String={2valnf=NumberFormat.getInstance()3nf.setMinimumIntegerDigits(20)4nf.setMaximumFractionDigits(0)5nf.setGroupingUsed(false)6nf.format(offset)7}这个方法的作用是通过给定的位移值计算出对应的日志段文件名。Kafka日志文件固定是20位的长度,filenamePrefixFromOffset方法就是用前面补0的方式,把给定位移值扩充成一个固定20位长度的字符串。

下面我们来看Log源码部分的重头戏:Log类。这是一个2000多行的大类。放眼整个Kafka源码,像Log这么大的类也不多见,足见它的重要程度。我们先来看这个类的定义:

1classLog(@volatilevardir:File,2@volatilevarconfig:LogConfig,3@volatilevarlogStartOffset:Long,4@volatilevarrecoveryPoint:Long,5scheduler:Scheduler,6brokerTopicStats:BrokerTopicStats,7valtime:Time,8valmaxProducerIdExpirationMs:Int,9valproducerIdExpirationCheckIntervalMs:Int,10valtopicPartition:TopicPartition,11valproducerStateManager:ProducerStateManager,12logDirFailureChannel:LogDirFailureChannel)extendsLoggingwithKafkaMetricsGroup{13……14}看着好像有很多属性,但其实,你只需要记住两个属性的作用就够了:dir和logStartOffset。dir就是这个日志所在的文件夹路径,也就是主题分区的路径。而logStartOffset,表示日志的当前最早位移。dir和logStartOffset都是volatilevar类型,表示它们的值是变动的,而且可能被多个线程更新。你可能听过日志的当前末端位移,也就是LogEndOffset(LEO),它是表示日志下一条待插入消息的位移值,而这个LogStartOffset是跟它相反的,它表示日志当前对外可见的最早一条消息的位移值。我用一张图来标识它们的区别:

有意思的是,LogEndOffset可以简称为LEO,但LogStartOffset却不能简称为LSO。因为在Kafka中,LSO特指LogStableOffset,属于Kafka事务的概念。

其实,除了Log类签名定义的这些属性之外,Log类还定义了一些很重要的属性,比如下面这段代码:

1@volatileprivatevarnextOffsetMetadata:LogOffsetMetadata=_2@volatileprivatevarhighWatermarkMetadata:LogOffsetMetadata=LogOffsetMetadata(logStartOffset)3privatevalsegments:ConcurrentNavigableMap[java.lang.Long,LogSegment]=newConcurrentSkipListMap[java.lang.Long,LogSegment]4@volatilevarleaderEpochCache:Option[LeaderEpochFileCache]=None第一个属性nextOffsetMetadata,它封装了下一条待插入消息的位移值,你基本上可以把这个属性和LEO等同起来。

第二个属性highWatermarkMetadata,是分区日志高水位值。

第三个属性segments,我认为这是Log类中最重要的属性。它保存了分区日志下所有的日志段信息,只不过是用Map的数据结构来保存的。Map的Key值是日志段的起始位移值,Value则是日志段对象本身。Kafka源码使用ConcurrentNavigableMap数据结构来保存日志段对象,就可以很轻松地利用该类提供的线程安全和各种支持排序的方法,来管理所有日志段对象。

第四个属性是LeaderEpochCache对象。LeaderEpoch是社区于0.11.0.0版本引入源码中的,主要是用来判断出现Failure时是否执行日志截断操作(Truncation)。之前靠高水位来判断的机制,可能会造成副本间数据不一致的情形。这里的LeaderEpochCache是一个缓存类数据,里面保存了分区Leader的Epoch值与对应位移值的映射关系,我建议你查看下LeaderEpochFileCache类,深入地了解下它的实现原理.

我一般习惯把Log的常见操作分为4大部分。高水位管理操作:高水位的概念在Kafka中举足轻重,对它的管理,是Log最重要的功能之一。日志段管理:Log是日志段的容器。高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题。关键位移值管理:日志定义了很多重要的位移值,比如LogStartOffset和LEO等。确保这些位移值的正确性,是构建消息引擎一致性的基础。读写操作:所谓的操作日志,大体上就是指读写日志。读写操作的作用之大,不言而喻。

源码中日志对象定义高水位的语句只有一行:

@volatileprivatevarhighWatermarkMetadata:LogOffsetMetadata=LogOffsetMetadata(logStartOffset)

这行语句传达了两个重要的事实:高水位值是volatile(易变型)的。因为多个线程可能同时读取它,因此需要设置成volatile,保证内存可见性。另外,由于高水位值可能被多个线程同时修改,因此源码使用JavaMonitor锁来确保并发修改的线程安全。高水位值的初始值是LogStartOffset值。每个Log对象都会维护一个LogStartOffset值。当首次构建高水位时,它会被赋值成LogStartOffset值。你可能会关心LogOffsetMetadata是什么对象。因为它比较重要,我们一起来看下这个类的定义:

1caseclassLogOffsetMetadata(messageOffset:Long,2segmentBaseOffset:Long=Log.UnknownOffset,relativePositionInSegment:Int=LogOffsetMetadata.UnknownFilePosition)

显然,它就是一个POJO类,里面保存了三个重要的变量。

messageOffset:消息位移值,这是最重要的信息。我们总说高水位值,其实指的就是这个变量的值。

segmentBaseOffset:保存该位移值所在日志段的起始位移。日志段起始位移值辅助计算两条消息在物理磁盘文件中位置的差值,即两条消息彼此隔了多少字节。这个计算有个前提条件,即两条消息必须处在同一个日志段对象上,不能跨日志段对象。否则它们就位于不同的物理文件上,计算这个值就没有意义了。这里的segmentBaseOffset,就是用来判断两条消息是否处于同一个日志段的。

relativePositionSegment:保存该位移值所在日志段的物理磁盘位置。这个字段在计算两个位移值之间的物理磁盘位置差值时非常有用。你可以想一想,Kafka什么时候需要计算位置之间的字节数呢?答案就是在读取日志的时候。假设每次读取时只能读1MB的数据,那么,源码肯定需要关心两个位移之间所有消息的总字节数是否超过了1MB。

LogOffsetMetadata类的所有方法,都是围绕这3个变量展开的工具辅助类方法,非常容易理解。我会给出一个方法的详细解释,剩下的你可以举一反三。

1defonSameSegment(that:LogOffsetMetadata):Boolean={2if(messageOffsetOnly)3thrownewKafkaException(s"$thiscannotcompareitssegmentinfowith$thatsinceitonlyhasmessageoffsetinfo")45this.segmentBaseOffset==that.segmentBaseOffset6}看名字我们就知道了,这个方法就是用来判断给定的两个LogOffsetMetadata对象是否处于同一个日志段的。判断方法很简单,就是比较两个LogOffsetMetadata对象的segmentBaseOffset值是否相等。

获取和设置高水位值,关于获取高水位值的方法,其实很好理解,我就不多说了。设置高水位值的方法,也就是Setter方法更复杂一些,为了方便你理解,我用注释的方式来解析它的作用。

1//gettermethod:读取高水位的位移值2defhighWatermark:Long=highWatermarkMetadata.messageOffset34//settermethod:设置高水位值5privatedefupdateHighWatermarkMetadata(newHighWatermark:LogOffsetMetadata):Unit={6if(newHighWatermark.messageOffset<0)//高水位值不能是负数7thrownewIllegalArgumentException("Highwatermarkoffsetshouldbenon-negative")89locksynchronized{//保护Log对象修改的Monitor锁10highWatermarkMetadata=newHighWatermark//赋值新的高水位值11producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)//处理事务状态管理器的高水位值更新逻辑,忽略它……12maybeIncrementFirstUnstableOffset()//FirstUnstableOffset是Kafka事务机制的一部分,忽略它……13}14trace(s"Settinghighwatermark$newHighWatermark")15}更新高水位值,除此之外,源码还定义了两个更新高水位值的方法:updateHighWatermark和maybeIncrementHighWatermark。从名字上来看,前者是一定要更新高水位值的,而后者是可能会更新也可能不会。我们分别看下它们的实现原理。

其实,这两个方法有着不同的用途。updateHighWatermark方法,主要用在Follower副本从Leader副本获取到消息后更新高水位值。一旦拿到新的消息,就必须要更新高水位值;而maybeIncrementHighWatermark方法,主要是用来更新Leader副本的高水位值。需要注意的是,Leader副本高水位值的更新是有条件的——某些情况下会更新高水位值,某些情况下可能不会。就像我刚才说的,Follower副本成功拉取Leader副本的消息后必须更新高水位值,但Producer端向Leader副本写入消息时,分区的高水位值就可能不需要更新——因为它可能需要等待其他Follower副本同步的进度。因此,源码中定义了两个更新的方法,它们分别应用于不同的场景。

读取高水位值关于高水位值管理的最后一个操作是fetchHighWatermarkMetadata方法。它不仅仅是获取高水位值,还要获取高水位的其他元数据信息,即日志段起始位移和物理位置信息。

前面我反复说过,日志是日志段的容器,那它究竟是如何承担起容器一职的呢?

1privatevalsegments:ConcurrentNavigableMap[java.lang.Long,LogSegment]=newConcurrentSkipListMap[java.lang.Long,LogSegment]可以看到,源码使用Java的ConcurrentSkipListMap类来保存所有日志段对象。ConcurrentSkipListMap有2个明显的优势。它是线程安全的,这样Kafka源码不需要自行确保日志段操作过程中的线程安全;它是键值(Key)可排序的Map。Kafka将每个日志段的起始位移值作为Key,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段。

Log对象维护了一些关键位移值数据,比如LogStartOffset、LEO等。其实,高水位值也算是关键位移值,只不过它太重要了,所以,我单独把它拎出来作为独立的一部分来讲了。

Log对象中的LEO永远指向下一条待插入消息,也就是说,LEO值上面是没有消息的!源码中定义LEO的语句很简单:这里的nextOffsetMetadata就是我们所说的LEO,它也是LogOffsetMetadata类型的对象。Log对象初始化的时候,源码会加载所有日志段对象,并由此计算出当前Log的下一条消息位移值。之后,Log对象将此位移值赋值给LEO。

实际上,LEO对象被更新的时机有4个。Log对象初始化时:当Log对象初始化时,我们必须要创建一个LEO对象,并对其进行初始化。写入新消息时:这个最容易理解。以上面的图为例,当不断向Log对象插入新消息时,LEO值就像一个指针一样,需要不停地向右移动,也就是不断地增加。Log对象发生日志切分(LogRoll)时:日志切分是啥呢?其实就是创建一个全新的日志段对象,并且关闭当前写入的日志段对象。这通常发生在当前日志段对象已满的时候。一旦发生日志切分,说明Log对象切换了ActiveSegment,那么,LEO中的起始位移值和段大小数据都要被更新,因此,在进行这一步操作时,我们必须要更新LEO对象。日志截断(LogTruncation)时:这个也是显而易见的。日志中的部分消息被删除了,自然可能导致LEO值发生变化,从而要更新LEO对象。

现在,我们再来思考一下,Kafka什么时候需要更新LogStartOffset呢?我们一一来看下。Log对象初始化时:和LEO类似,Log对象初始化时要给LogStartOffset赋值,一般是将第一个日志段的起始位移值赋值给它。日志截断时:同理,一旦日志中的部分消息被删除,可能会导致LogStartOffset发生变化,因此有必要更新该值。Follower副本同步时:一旦Leader副本的Log对象的LogStartOffset值发生变化。为了维持和Leader副本的一致性,Follower副本也需要尝试去更新该值。删除日志段时:这个和日志截断是类似的。凡是涉及消息删除的操作都有可能导致LogStartOffset值的变化。删除消息时:严格来说,这个更新时机有点本末倒置了。在Kafka中,删除消息就是通过抬高LogStartOffset值来实现的,因此,删除消息时必须要更新该值。

最后,我重点说说针对Log对象的读写操作。

写操作

在Log中,涉及写操作的方法有3个:appendAsLeader、appendAsFollower和append。appendAsLeader是用于写Leader副本的,appendAsFollower是用于Follower副本同步的。它们的底层都调用了append方法。

Kafka消息格式经历了两次大的变迁,目前是0.11.0.0版本引入的Version2消息格式。我们没有必要详细了解这些格式的变迁,你只需要知道,在0.11.0.0版本之后,lastOffset和lastOffsetOfFirstBatch都是指向消息集合的最后一条消息即可。它们的区别主要体现在0.11.0.0之前的版本。

读操作

read方法的流程相对要简单一些,首先来看它的方法签名:

1defread(startOffset:Long,2maxLength:Int,3isolation:FetchIsolation,4minOneMessage:Boolean):FetchDataInfo={5......6}它接收4个参数,含义如下:startOffset,即从Log对象的哪个位移值开始读消息。maxLength,即最多能读取多少字节。isolation,设置读取隔离级别,主要控制能够读取的最大位移值,多用于Kafka事务。minOneMessage,即是否允许至少读一条消息。设想如果消息很大,超过了maxLength,正常情况下read方法永远不会返回任何消息。但如果设置了该参数为true,read方法就保证至少能够返回一条消息。read方法的返回值是FetchDataInfo类,也是一个POJO类,里面最重要的数据就是读取的消息集合,其他数据还包括位移等元数据信息。

其中,OffsetIndex、TimeIndex和TransactionIndex都继承了AbstractIndex类,而上层的LazyIndex仅仅是包装了一个AbstractIndex的实现类,用于延迟加载。就像我之前说的,LazyIndex的作用是为了提升性能,并没有什么功能上的改进。

AbstractIndex定义了4个属性字段。由于是一个抽象基类,它的所有子类自动地继承了这4个字段。也就是说,Kafka所有类型的索引对象都定义了这些属性。我先给你解释下这些属性的含义。

索引文件(file)。每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了,这个字段是var型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?是的。自1.1.0版本之后,Kafka允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。

起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看Kafka日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是00000000000000000123.log,正常情况下,一定还有一组索引文件00000000000000000123.index、00000000000000000123.timeindex等。这里的“123”就是这组文件的起始位移值,也就是baseOffset值。

索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka源码传入该参数的值是Broker端参数segment.index.bytes的值,即10MB。这就是在默认情况下,所有Kafka索引文件大小都是10MB的原因。

索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。如果我没记错的话,这个参数应该是我加上去的,就是为了修复我刚刚提到的那个Bug。

1privatedefindexSlotRangeFor(idx:ByteBuffer,target:Long,searchEntity:IndexSearchEntity):(Int,Int)={2//第1步:如果当前索引为空,直接返回<-1,-1>对3if(_entries==0)4return(-1,-1)567//第2步:要查找的位移值不能小于当前最小位移值8if(compareIndexEntry(parseEntry(idx,0),target,searchEntity)>0)9return(-1,0)101112//binarysearchfortheentry13//第3步:执行二分查找算法14varlo=015varhi=_entries-116while(lo0)21hi=mid-122elseif(compareResult<0)23lo=mid24else25return(mid,mid)26}272829(lo,if(lo==_entries-1)-1elselo+1)这段代码的核心是,第3步的二分查找算法。熟悉BinarySearch的话,你对这段代码一定不会感到陌生。讲到这里,似乎一切很完美:Kafka索引应用二分查找算法快速定位待查找索引项位置,之后调用parseEntry来读取索引项。不过,这真的就是无懈可击的解决方案了吗?

改进版二分查找算法

大多数操作系统使用页缓存来实现内存映射,而目前几乎所有的操作系统都使用LRU(LeastRecentlyUsed)或类似于LRU的机制来管理页缓存。Kafka写入索引文件的方式是在文件末尾追加写入,而几乎所有的索引查询都集中在索引的尾部。这么来看的话,LRU机制是非常适合Kafka的索引访问场景的。但,这里有个问题是,当Kafka在查询索引的时候,原版的二分查找算法并没有考虑到缓存的问题,因此很可能会导致一些不必要的缺页中断(PageFault)。此时,Kafka线程会被阻塞,等待对应的索引项从物理磁盘中读出并放入到页缓存中。下面我举个例子来说明一下这个情况。假设Kafka的某个索引占用了操作系统页缓存13个页(Page),如果待查找的位移值位于最后一个页上,也就是Page12,那么标准的二分查找算法会依次读取页号0、6、9、11和12。

通常来说,一个页上保存了成百上千的索引项数据。随着索引文件不断被写入,Page#12不断地被填充新的索引项。如果此时索引查询方都来自ISR副本或Lag很小的消费者,那么这些查询大多集中在对Page#12的查询,因此,Page#0、6、9、11、12一定经常性地被源码访问。也就是说,这些页一定保存在页缓存上。后面当新的索引项填满了Page#12,页缓存就会申请一个新的Page来保存索引项,即Page#13。现在,最新索引项保存在Page#13中。如果要查找最新索引项,原版二分查找算法将会依次访问Page#0、7、10、12和13。此时,问题来了:Page7和10已经很久没有被访问过了,它们大概率不在页缓存中,因此,一旦索引开始征用Page#13,就会发生PageFault,等待那些冷页数据从磁盘中加载到页缓存。根据国外用户的测试,这种加载过程可能长达1秒。

显然,这是一个普遍的问题,即每当索引文件占用Page数发生变化时,就会强行变更二分查找的搜索路径,从而出现不在页缓存的冷数据必须要加载到页缓存的情形,而这种加载过程是非常耗时的。基于这个问题,社区提出了改进版的二分查找策略,也就是缓存友好的搜索算法。总体的思路是,代码将所有索引项分成两个部分:热区(WarmArea)和冷区(ColdArea),然后分别在这两个区域内执行二分查找算法。

位移索引也就是所谓的OffsetIndex,它可是一个老资历的组件了。如果我没记错的话,国内大面积使用Kafka应该是在0.8时代。从那个时候开始,OffsetIndex就已经存在了。每当Consumer需要从主题分区的某个位置开始读取消息时,Kafka就会用到OffsetIndex直接定位物理文件位置,从而避免了因为从头读取消息而引入的昂贵的I/O操作。不同索引类型保存不同的对。就OffsetIndex而言,Key就是消息的相对位移,Value是保存该消息的日志段文件中该消息第一个字节的物理文件位置。

为什么是8呢?相对位移是一个整型(Integer),占用4个字节,物理文件位置也是一个整型,同样占用4个字节,因此总共是8个字节。那相对位移是什么值呢?我们知道,Kafka中的消息位移值是一个长整型(Long),应该占用8个字节才对。在保存OffsetIndex的对时,Kafka做了一些优化。每个OffsetIndex对象在创建时,都已经保存了对应日志段对象的起始位移,因此,OffsetIndex索引项没必要保存完整的8字节位移值。相反地,它只需要保存与起始位移的差值(Delta)就够了,而这个差值是可以被整型容纳的。这种设计可以让OffsetIndex每个索引项都节省4个字节。

当读取OffsetIndex时,源码还需要将相对位移值还原成之前的完整位移。这个是在parseEntry方法中实现的。

这个方法返回一个OffsetPosition类型。该类有两个方法,分别返回索引项的Key和Value。这里的parseEntry方法,就是要构造OffsetPosition所需的Key和Value。Key是索引项中的完整位移值,代码使用baseOffset+relativeOffset(buffer,n)的方式将相对位移值还原成完整位移值;Value是这个位移值上消息在日志段文件中的物理位置,代码调用physical方法计算这个物理位置并把它作为Value。最后,parseEntry方法把Key和Value封装到一个OffsetPosition实例中,然后将这个实例返回。

写入索引项好了,有了这些基础,下面的内容就很容易理解了。我们来看下OffsetIndex中最重要的操作——写入索引项append方法的实现。

1defappend(offset:Long,position:Int):Unit={2inLock(lock){3//索引文件如果已经写满,直接抛出异常4require(!isFull,"Attempttoappendtoafullindex(size="+_entries+").")5//要保证待写入的位移值offset比当前索引文件中所有现存的位移值都要大6//这主要是为了维护索引的单调增加性7if(_entries==0||offset>_lastOffset){8trace(s"Addingindexentry$offset=>$positionto${file.getAbsolutePath}")9mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值10mmap.putInt(position)//向mmap写入物理文件位置11_entries+=1//更新索引项个数12_lastOffset=offset//更新当前索引文件最大位移值13//确保写入索引项格式符合要求14require(_entries*entrySize==mmap.position(),s"$entriesentriesbutfilepositioninindexis${mmap.position()}.")15}else{16thrownewInvalidOffsetException(s"Attempttoappendanoffset($offset)toposition$entriesnolargerthan"+17s"thelastoffsetappended(${_lastOffset})to${file.getAbsolutePath}.")18}19}20}

append方法接收两个参数:Long型的位移值和Integer型的物理文件位置。该方法最重要的两步,就是分别向mmap写入相对位移值和物理文件位置。除了append方法,索引还有一个常见的操作:截断操作(Truncation)。截断操作是指,将索引文件内容直接裁剪掉一部分。比如,OffsetIndex索引文件中当前保存了100个索引项,我想只保留最开始的40个索引项。

这个方法接收entries参数,表示要截取到哪个槽,主要的逻辑实现是调用mmap的position方法。源码中的_entries*entrySize就是mmap要截取到的字节处。下面,我来说说OffsetIndex的使用方式。既然OffsetIndex被用来快速定位消息所在的物理文件位置,那么必然需要定义一个方法执行对应的查询逻辑。这个方法就是lookup。

方法返回的,是不大于给定位移值targetOffset的最大位移值,以及对应的物理文件位置。你大致可以把这个方法,理解为位移值的FLOOR函数。

写入索引项TimeIndex也有append方法,只不过它叫作maybeAppend。我们来看下它的实现逻辑。

以后关于kafka系列的总结大部分来自GeekTime的课件,大家可以自行关键字搜索。

THE END
1.寻找论文中的代码知道算法公式怎么找代码邮件联系第一作者(不限于第一作者).如果是一些博士生,有的还是乐于分享代码的,这样自己的文章也更容易被同行引用。 查看引用该论文且使用该论文作为baseline或比较对象的其他论文,找这些论文的作者要代码。 某些论文算法可以分步解决,则可以分别找每一步的代码。 https://blog.csdn.net/qq_42235129/article/details/102814312
2.怎么查看python算法库中算法的源代码在数据科学和机器学习的领域,Python算法库如scikit-learn、numpy和pandas等,功能强大且广泛应用,但了解它们底层实现的细节通常被忽略。因此,设计一个方案以便查看这些算法库中算法的源代码将对学习和研究有很大帮助。 项目目标 教会用户如何获取Python算法库的代码。 https://blog.51cto.com/u_16175436/12335221
3.c语言源代码怎么找C++c语言源代码怎么找 您可以通过以下方式查找 c 语言源代码:查看开源代码库(如 github、bitbucket 和 sourceforge);访问特定领域网站(如 leetcode、hackerrank 和 codechef);利用本地资源(如软件包管理系统);查找现成的项目(如第三方 c 语言库);使用搜索引擎(如 google 或 bing);使用提示(如许可证筛选和代码健康https://www.php.cn/faq/843998.html
4.如何获取代码60秒读懂世界全面指南:如何轻松获取代码及高效使用 引言: 在当今数字化时代,代码已成为推动技术发展的重要工具。无论是开发者还是对编程感兴趣的学习者,获取代码都是开展项目、学习新技术的基础。本文将为您详细解析如何获取代码,以及如何高效地使用这些代码资源。 一、代码获取途径 https://blog.yyzq.team/post/468230.html
5.Python中的查找算法代码实例python这篇文章主要介绍了Python中的查找算法代码实例,算法是解决一系列问题的清晰指令,也就是,能对一定规范的输入,在有限的时间内获得所要求的输出,简单来说,算法就是解决一个问题的具体方法和步骤,算法是程序的灵魂,需要的朋友可以参考下https://www.jb51.net/python/293423gps.htm
6.哈希查找算法的源代码c语言哈希查找算法的源代码c语言【问题描述】针对自己的班集体中的“人名”设计一个哈希表,使得平均查找长度不超过R,完成相应的建表和查表程序。[基本要求]假设人名为中国姓名**语拼音形式。待填入哈希表的人名共有30个,取平均查找长度的上限为2。哈希函数用除留余数法构照,用链表法处理冲突。[测试数据]读取熟悉的30https://www.docin.com/p-646749586.html
7.那些经典的算法:从猜数字到二分查找算法二分查找算法代码实现 publicclassTestBinSearch{staticintbsearch(inta[],intsize,intsearchValue){intlow=0;inthigh=size-1;//用high-low 是为了防止数组过大,两数相加溢出,用移位是为了提升性能intmid=(high-low)>>1+low;while(low<=high){if(a[mid]>searchValue){high=mid-1;}elseif(a[mid]<searchhttps://www.jianshu.com/p/337a81db5e28
8.科学网—[转载]Delaunay三角剖分及算法基本知识2.计算Delaunay三角剖分的算法及分析 3.例子程序&代码 大话 点集的三角剖分(Triangulation),对数值分析(比如有限元分析)以及图形学来说,都是极为重要的一项预处理技术。 尤其是Delaunay三角剖分,由于其独特性,关于点集的很多种几何图都和Delaunay三角剖分相关,如Voronoi图,EMST树,Gabriel图等。Delaunay三角剖分有几https://blog.sciencenet.cn/blog-116465-216935.html
9.实验指导数据结构教学运行与管理三、实验源代码 四、实验结果(测试数据) 第六章 递归实验 6.1折半查找递归算法 一、实验目的 1.理解递归调用的实现过程 2.学会递归程序的设计方法 二、实验内容 1.编写折半查找的递归程序。 2.在VC++的调试环境下观察折半查找递归程序的调用与返回过程,并记录其过程和返回值。 https://www.gxtcmu.edu.cn/ggxy/jysjs1/xxglyxxxtjysyxxxgcjyshs/jxyhygl2/sjjg/content_29239
10.自动驾驶路径规划技术A*启发式搜索算法腾讯云开发者社区A*算法是一种大规模静态路网中求解最短路径最有效的搜索方法,相比于Dijkstra算法,它提供了搜索方向的启发性指引信息,在大多数情况下大大降低了Dijkstra算法无效的冗余的扩展搜索,因此也成为自动驾驶路径规划中的首选算法。 Dijkstra算法和A*算法的伪代码如下,可以看到A*算法搜索过程中,增加了对于未来预测的启发性的Costhttps://cloud.tencent.com/developer/article/1989495
11.GitHubCoding4Real/leetcodePDF版本:「代码随想录」算法精讲 PDF 版本。 最强八股文::代码随想录知识星球精华PDF 刷题顺序: README已经将刷题顺序排好了,按照顺序一道一道刷就可以。 学习社区: 一起学习打卡/面试技巧/如何选择offer/大厂内推/职场规则/简历修改/技术分享/程序人生。欢迎加入「代码随想录」知识星球。 https://github.com/Coding4Real/leetcode-master
12.腾讯算法岗武功秘籍(上)所以,不要存在侥幸心理,踏踏实实的刷题,复习好常规机器学习算法,尤其是算法的原理和应用场景。 ★ 项目和比赛经历非常的重要,往往面试官都是根据项目里用到的方法拓展提问,对项目的优化和改进也问的比较多。还有就是能内推的一定去找学长学姐或是其它资源去内推。 ★ 面试过程中如果实在写不出来代码的话,就给https://www.flyai.com/article/930
13.信息学奥赛算法专题:三分查找搜索算法的步骤及代码以上思路参考了《三分查找》,但也对代码按照我自己的理解,进行了修改,也方便给学生解释。04 —另外一种取值方法 lmid与rmid的取值:一般可以将这两个点取为[l,r]的三等分点。即 lmid=l+(r-l)/3.0;rmid=r-(l-r)/3.0;信息学本身是一门比较难的学科,很多学生会因为老师讲的不够详细,不够透彻,https://baijiahao.baidu.com/s?id=1768637125397387649&wfr=spider&for=pc
14.速石科技Fsched:国产自研调度器的璀璨新星,数百DEBUG:深入代码级的技术支持 举一个典型例子:当研发提交任务出现异常状态,怎么办? 我们首先需要定位与任务相关的日志。日志分为:基础设施层日志、中间件层日志、应用层日志等。 IT和研发工程师的关注点不一样:IT工程师一般看基础设施层日志,CAD和研发工程师看中间件层日志和应用层日志。不同角色各看各的,定位问题https://www.cet.com.cn/itpd/itxw/3439124.shtml