Skip to content

Commit

Permalink
Log the sizes allocated through the SimpleMemoryPool for analysis.
Browse files Browse the repository at this point in the history
  • Loading branch information
wyuka committed Aug 29, 2022
1 parent 4de9f63 commit c57acec
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.memory;

import java.util.Optional;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;

Expand All @@ -42,7 +43,7 @@ public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements Auto
private volatile boolean alive = true;

public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor, null);
super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor, null, Optional.empty());
this.alive = true;
this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");
this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.kafka.common.memory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MemoryPoolStatsStore {
private static final Logger log = LoggerFactory.getLogger(MemoryPoolStatsStore.class);

private final AtomicInteger[] histogram;
private final int maxSizeBytes;
private final int segmentSizeBytes;

public static class Range {
public final int startInclusive;
public final int endInclusive;

public Range(int startInclusive, int endInclusive) {
this.startInclusive = startInclusive;
this.endInclusive = endInclusive;
}

@Override
public String toString() {
return "Range{" + "startInclusive=" + startInclusive + ", endInclusive=" + endInclusive + '}';
}
}

public MemoryPoolStatsStore(int segments, int maxSizeBytes) {
histogram = new AtomicInteger[segments];
this.maxSizeBytes = maxSizeBytes;
segmentSizeBytes = (int) Math.ceil((double) maxSizeBytes / segments);
for (int segmentIndex = 0; segmentIndex < segments; segmentIndex++) {
histogram[segmentIndex] = new AtomicInteger();
}
}

private int getSegmentIndexForBytes(int bytes) {
if (bytes == 0) {
throw new IllegalArgumentException("Requested zero bytes for allocation.");
}
if (bytes > maxSizeBytes) {
log.debug("Requested bytes {} for allocation exceeds maximum recorded value {}", bytes, maxSizeBytes);
return -1;
} else {
return (bytes - 1) / segmentSizeBytes;
}
}

public void recordAllocation(int bytes) {
try {
final int segmentIndex = getSegmentIndexForBytes(bytes);
if (segmentIndex != -1) {
histogram[segmentIndex].incrementAndGet();
}
} catch (IllegalArgumentException e) {
log.error("Encountered error when trying to record memory allocation for request", e);
}
}

public synchronized Map<Range, Integer> getFrequencies() {
Map<Range, Integer> frequenciesMap = new HashMap<>();
for (int segmentIndex = 0; segmentIndex < histogram.length; segmentIndex++) {
frequenciesMap.put(new Range(
segmentIndex * segmentSizeBytes + 1,
segmentIndex * segmentSizeBytes + segmentSizeBytes
), histogram[segmentIndex].intValue());
}
return frequenciesMap;
}

public synchronized void clear() {
for (AtomicInteger atomicInteger : histogram) {
atomicInteger.set(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.memory;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.common.metrics.Sensor;
Expand All @@ -37,10 +38,13 @@ public class SimpleMemoryPool implements MemoryPool {
protected final AtomicLong availableMemory;
protected final int maxSingleAllocationSize;
protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds
private final Optional<MemoryPoolStatsStore> memoryPoolStatsStore;
protected volatile Sensor oomTimeSensor;
protected volatile Sensor allocateSensor;

public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor, Sensor allocateSensor) {
public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor,
Sensor allocateSensor, Optional<MemoryPoolStatsStore> memoryPoolStatsStore) {
this.memoryPoolStatsStore = memoryPoolStatsStore;
if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)
throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."
+ "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");
Expand All @@ -57,7 +61,8 @@ public ByteBuffer tryAllocate(int sizeBytes) {
if (sizeBytes < 1)
throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
if (sizeBytes > maxSingleAllocationSize)
throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
throw new IllegalArgumentException(
"requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);

long available;
boolean success = false;
Expand Down Expand Up @@ -114,6 +119,7 @@ public boolean isOutOfMemory() {
//allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
protected void bufferToBeReturned(ByteBuffer justAllocated) {
this.allocateSensor.record(justAllocated.capacity());
memoryPoolStatsStore.ifPresent(sizeStore -> sizeStore.recordAllocation(justAllocated.capacity()));
log.trace("allocated buffer of size {} ", justAllocated.capacity());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ public void testPartialReceiveGracefulClose() throws Exception {
public void testMuteOnOOM() throws Exception {
//clean up default selector, replace it with one that uses a finite mem pool
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor);
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor, Optional.empty());
selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.network;

import java.nio.channels.SelectionKey;
import java.util.Optional;
import javax.net.ssl.SSLEngine;

import org.apache.kafka.common.config.SecurityConfig;
Expand Down Expand Up @@ -288,7 +289,7 @@ public void testRenegotiationFails() throws Exception {
public void testMuteOnOOM() throws Exception {
//clean up default selector, replace it with one that uses a finite mem pool
selector.close();
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor);
MemoryPool pool = new SimpleMemoryPool(900, 900, false, null, sensor, Optional.empty());
//the initial channel builder is for clients, we need a server one
String tlsProtocol = "TLSv1.2";
File trustStoreFile = File.createTempFile("truststore", ".jks");
Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import kafka.utils._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.memory.{MemoryPool, RecyclingMemoryPool, MemoryPoolStatsStore, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Max, Meter, Percentile, Percentiles, Rate}
Expand Down Expand Up @@ -79,9 +79,19 @@ class SocketServer(val config: KafkaConfig,
val time: Time,
val credentialProvider: CredentialProvider,
val observer: Observer,
val apiVersionManager: ApiVersionManager)
val apiVersionManager: ApiVersionManager,
val memoryPoolStatsStore: Optional[MemoryPoolStatsStore])
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

def this(config: KafkaConfig,
metrics: Metrics,
time: Time,
credentialProvider: CredentialProvider,
observer: Observer,
apiVersionManager: ApiVersionManager) {
this(config, metrics, time, credentialProvider, observer, apiVersionManager, Optional.empty())
}

private val maxQueuedRequests = config.queuedMaxRequests

private val nodeId = config.brokerId
Expand All @@ -100,7 +110,7 @@ class SocketServer(val config: KafkaConfig,
private val percentiles = (1 to 9).map( i => new Percentile(metrics.metricName("MemoryPoolAllocateSize%dPercentile".format(i * 10), MetricsGroup), i * 10))
// At current stage, we do not know the max decrypted request size, temporarily set it to 10MB.
memoryPoolAllocationSensor.add(new Percentiles(400, 0.0, 10485760, BucketSizing.CONSTANT, percentiles:_*))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor)
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolUsageSensor, memoryPoolAllocationSensor, memoryPoolStatsStore)
else if (config.socketRequestCommonBytes > 0) new RecyclingMemoryPool(config.socketRequestCommonBytes, config.socketRequestBufferCacheSize, memoryPoolAllocationSensor)
else MemoryPool.NONE
// data-plane
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ object Defaults {
val AllowPreferredControllerFallback = true
val MissingPerTopicConfig = "-1"

val MemoryPoolStatsLoggingEnable = false
val MemoryPoolStatsMaxSize = Integer.MAX_VALUE
val MemoryPoolStatsNumSegments = 1000
val MemoryPoolStatsLoggingFrequencyMinutes = 60

val UnofficialClientLoggingEnable = false
val UnofficialClientCacheTtl = 1
val ExpectedClientSoftwareNames = util.Arrays.asList(
Expand Down Expand Up @@ -441,6 +446,10 @@ object KafkaConfig {
val UnofficialClientLoggingEnableProp = "unofficial.client.logging.enable"
val UnofficialClientCacheTtlProp = "unofficial.client.cache.ttl"
val ExpectedClientSoftwareNamesProp = "expected.client.software.names"
val MemoryPoolStatsLoggingEnableProp = "memory.pool.stats.logging.enable"
val MemoryPoolStatsMaxSizeProp = "memory.pool.stats.max.size"
val MemoryPoolStatsNumSegmentsProp = "memory.pool.stats.num.segments"
val MemoryPoolStatsLoggingFrequencyMinutesProp = "memory.pool.stats.logging.frequency.minutes"

/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
Expand Down Expand Up @@ -780,6 +789,10 @@ object KafkaConfig {
val UnofficialClientLoggingEnableDoc = "Controls whether logging occurs when an ApiVersionsRequest is received from a client unsupported by LinkedIn, such as an Apache Kafka client."
val UnofficialClientCacheTtlDoc = "The amount of time (in hours) for the identity of an unofficial client to live in the local cache to avoid duplicate log messages."
val ExpectedClientSoftwareNamesDoc = "The software names of clients that are supported by LinkedIn, such as Avro, Raw, and Tracking clients."
val MemoryPoolStatsLoggingEnableDoc = "Specifies whether memory pool statistics should be logged."
val MemoryPoolStatsMaxSizeDoc = "Maximum size of memory allocation which will be recorded if memory pool statistics logging is enabled."
val MemoryPoolStatsNumSegmentsDoc = "The number of segments into which the memory pool statistics histogram will be broken."
val MemoryPoolStatsLoggingFrequencyMinutesDoc = "The frequency in minutes at which memory pool statistics will be recorded."

/************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
Expand Down Expand Up @@ -1214,6 +1227,10 @@ object KafkaConfig {
.define(UnofficialClientLoggingEnableProp, BOOLEAN, Defaults.UnofficialClientLoggingEnable, LOW, UnofficialClientLoggingEnableDoc)
.define(UnofficialClientCacheTtlProp, LONG, Defaults.UnofficialClientCacheTtl, LOW, UnofficialClientCacheTtlDoc)
.define(ExpectedClientSoftwareNamesProp, LIST, Defaults.ExpectedClientSoftwareNames, LOW, ExpectedClientSoftwareNamesDoc)
.define(MemoryPoolStatsLoggingEnableProp, BOOLEAN, Defaults.MemoryPoolStatsLoggingEnable, LOW, MemoryPoolStatsLoggingEnableDoc)
.define(MemoryPoolStatsMaxSizeProp, INT, Defaults.MemoryPoolStatsMaxSize, LOW, MemoryPoolStatsMaxSizeDoc)
.define(MemoryPoolStatsNumSegmentsProp, INT, Defaults.MemoryPoolStatsNumSegments, LOW, MemoryPoolStatsNumSegmentsDoc)
.define(MemoryPoolStatsLoggingFrequencyMinutesProp, INT, Defaults.MemoryPoolStatsLoggingFrequencyMinutes, LOW, MemoryPoolStatsLoggingFrequencyMinutesDoc)

/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
Expand Down Expand Up @@ -1723,6 +1740,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def unofficialClientCacheTtl = getLong(KafkaConfig.UnofficialClientCacheTtlProp)
def expectedClientSoftwareNames = getList(KafkaConfig.ExpectedClientSoftwareNamesProp)

def memoryPoolStatsLoggingEnable = getBoolean(KafkaConfig.MemoryPoolStatsLoggingEnableProp)
def memoryPoolStatsMaxSize = getInt(KafkaConfig.MemoryPoolStatsMaxSizeProp)
def memoryPoolStatsNumSegments = getInt(KafkaConfig.MemoryPoolStatsNumSegmentsProp)
def memoryPoolStatsLoggingFrequencyMinutes = getInt(KafkaConfig.MemoryPoolStatsLoggingFrequencyMinutesProp)

def getNumReplicaAlterLogDirsThreads: Int = {
val numThreads: Integer = Option(getInt(KafkaConfig.NumReplicaAlterLogDirsThreadsProp)).getOrElse(logDirs.size)
numThreads
Expand Down
36 changes: 34 additions & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPoolStatsStore
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.ControlledShutdownRequestData
import org.apache.kafka.common.metrics.Metrics
Expand All @@ -46,13 +47,14 @@ import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledSh
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils, PoisonPill}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, PoisonPill, Time, Utils}
import org.apache.kafka.common.{Endpoint, Node, TopicPartition}
import org.apache.kafka.metadata.BrokerState
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.zookeeper.client.ZKClientConfig

import java.util.Optional
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, Buffer}
Expand Down Expand Up @@ -323,13 +325,43 @@ class KafkaServer(

observer = Observer(config)

def initializeMemoryPoolStats(): Optional[MemoryPoolStatsStore] = {
if (!config.memoryPoolStatsLoggingEnable) {
return Optional.empty()
}

info(s"Memory pool stats logging is enabled, segments = " +
s"${config.memoryPoolStatsNumSegments}, max size = ${config.memoryPoolStatsMaxSize}, " +
s"logging frequency in minutes = ${config.memoryPoolStatsLoggingFrequencyMinutes}")
val memoryPoolStatsStore = new MemoryPoolStatsStore(config.memoryPoolStatsNumSegments, config.memoryPoolStatsMaxSize)
val requestStatsLogger = new MemoryPoolStatsLogger()

def publishHistogramToLog(): Unit = {
info("Publishing memory pool stats")
requestStatsLogger.logStats(memoryPoolStatsStore)
memoryPoolStatsStore.clear()
}

val histogramPublisher = new KafkaScheduler(threads = 1, "histogram-publisher-")
histogramPublisher.startup()
histogramPublisher.schedule(name = "publish-histogram-to-log",
fun = publishHistogramToLog,
period = config.memoryPoolStatsLoggingFrequencyMinutes.toLong,
unit = TimeUnit.MINUTES)

Optional.of(memoryPoolStatsStore)
}

val memoryPoolStatsStore = initializeMemoryPoolStats()

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//
// Note that we allow the use of KRaft mode controller APIs when forwarding is enabled
// so that the Envelope request is exposed. This is only used in testing currently.
socketServer = new SocketServer(config, metrics, time, credentialProvider, observer, apiVersionManager)
socketServer = new SocketServer(
config, metrics, time, credentialProvider, observer, apiVersionManager, memoryPoolStatsStore)
socketServer.startup(startProcessingRequests = false)

/* start replica manager */
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/scala/kafka/server/MemoryPoolStatsLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.server

import com.typesafe.scalalogging.Logger
import kafka.utils.Logging
import org.apache.kafka.common.memory.MemoryPoolStatsStore

import scala.collection.JavaConverters._

object MemoryPoolStatsLogger {
private val logger = Logger("memory.pool.stats.logger")
}

class MemoryPoolStatsLogger extends Logging {
override lazy val logger = MemoryPoolStatsLogger.logger

def logStats(memoryPoolStatsStore: MemoryPoolStatsStore): Unit = {
val frequencyList = memoryPoolStatsStore.getFrequencies.asScala.toSeq.sortBy(_._1.startInclusive)
frequencyList.foreach {
case (range, frequency) =>
info(s"[${range.startInclusive}-${range.endInclusive}] = $frequency")
}
}
}

0 comments on commit c57acec

Please sign in to comment.