From a6e1624f489b9e4886d87cbb36beaabb12fe8f56 Mon Sep 17 00:00:00 2001 From: Davi Arnaut Date: Sat, 26 Oct 2024 00:10:40 -0400 Subject: [PATCH] fix(log): reduce log volume for ingestion and consumers (#11714) --- .../linkedin/metadata/entity/EntityServiceImpl.java | 10 ++++++---- .../search/elasticsearch/ElasticSearchService.java | 2 +- .../com/linkedin/metadata/kafka/MCLKafkaListener.java | 2 +- .../metadata/resources/entity/AspectResource.java | 9 --------- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 9f608be4f3d18..9337ea3c2b6f7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -164,6 +164,8 @@ public class EntityServiceImpl implements EntityService { private final Integer ebeanMaxTransactionRetry; private final boolean enableBrowseV2; + private static final long DB_TIMER_LOG_THRESHOLD_MS = 50; + @Getter private final Map, ThrottleEvent> throttleEvents = new ConcurrentHashMap<>(); @@ -997,10 +999,10 @@ private List ingestAspectsToLocalDB( if (txContext != null) { txContext.commitAndContinue(); } - long took = ingestToLocalDBTimer.stop(); - log.info( - "Ingestion of aspects batch to database took {} ms", - TimeUnit.NANOSECONDS.toMillis(took)); + long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop()); + if (took > DB_TIMER_LOG_THRESHOLD_MS) { + log.info("Ingestion of aspects batch to database took {} ms", took); + } // Retention optimization and tx if (retentionService != null) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 6001e2f6e660f..261ec127d5497 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -112,7 +112,7 @@ public void appendRunId( @Nullable String runId) { final String docId = indexBuilders.getIndexConvention().getEntityDocumentId(urn); - log.info( + log.debug( "Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId); esWriteDAO.applyScriptUpdate( opContext, diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java index ff929603514c5..a2d59023ba5ce 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListener.java @@ -95,7 +95,7 @@ public void consume(final ConsumerRecord consumerRecord) // Here - plug in additional "custom processor hooks" for (MetadataChangeLogHook hook : this.hooks) { - log.info( + log.debug( "Invoking MCL hook {} for urn: {}", hook.getClass().getSimpleName(), event.getEntityUrn()); diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 634e8d32cc722..63b607f8c9967 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -243,13 +243,6 @@ public Task ingestProposal( @ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal, @ActionParam(PARAM_ASYNC) @Optional(UNSET) String async) throws URISyntaxException { - - String urn = metadataChangeProposal.getEntityUrn() != null ? metadataChangeProposal.getEntityUrn().toString() : - java.util.Optional.ofNullable(metadataChangeProposal.getEntityKeyAspect()).orElse(new GenericAspect()) - .getValue().asString(StandardCharsets.UTF_8); - String proposedValue = java.util.Optional.ofNullable(metadataChangeProposal.getAspect()).orElse(new GenericAspect()) - .getValue().asString(StandardCharsets.UTF_8); - final boolean asyncBool; if (UNSET.equals(async)) { asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME)); @@ -266,8 +259,6 @@ public Task ingestProposalBatch( @ActionParam(PARAM_PROPOSALS) @Nonnull MetadataChangeProposal[] metadataChangeProposals, @ActionParam(PARAM_ASYNC) @Optional(UNSET) String async) throws URISyntaxException { - log.info("INGEST PROPOSAL BATCH proposals: {}", Arrays.asList(metadataChangeProposals)); - final boolean asyncBool; if (UNSET.equals(async)) { asyncBool = Boolean.parseBoolean(System.getenv(ASYNC_INGEST_DEFAULT_NAME));