Skip to content

Commit

Permalink
refactor: app env query and machineRoom and message handle pre and po…
Browse files Browse the repository at this point in the history
…st (#356)
  • Loading branch information
wtt40122 authored Jun 5, 2024
1 parent f6f4b84 commit d88a0fa
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,9 @@ public Result<List<MapDTO>> queryAppByStoreId(@RequestParam(value = "storeId") L
*/
@RequestMapping(path = "/milog/project/env/appId", method = "get")
public Result<List<MilogAppEnvDTO>> getEnInfosByAppId(@RequestParam(value = "milogAppId") Long milogAppId,
@RequestParam(value = "deployWay") Integer deployWay) {
return logTailService.getEnInfosByAppId(milogAppId, deployWay);
@RequestParam(value = "deployWay") Integer deployWay,
@RequestParam(value = "machineRoom") String machineRoom) {
return logTailService.getEnInfosByAppId(milogAppId, deployWay,machineRoom);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public interface LogTailService {
* @param deployWay
* @return
*/
Result<List<MilogAppEnvDTO>> getEnInfosByAppId(Long milogAppId, Integer deployWay);
Result<List<MilogAppEnvDTO>> getEnInfosByAppId(Long milogAppId, Integer deployWay, String machineRoom);

Result<List<String>> getTailNamesBystoreId(String tail, Integer appType, Long id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void logTailDelPostProcess(MilogLogStoreDO logStoreDO, MilogLogTailDo mil
}

@Override
public List<MilogAppEnvDTO> getEnInfosByAppId(AppBaseInfo appBaseInfo, Long milogAppId, Integer deployWay) {
public List<MilogAppEnvDTO> getEnInfosByAppId(AppBaseInfo appBaseInfo, Long milogAppId, Integer deployWay, String machineRoom) {
List<HeraSimpleEnv> heraSimpleEnvs = null;
try {
heraSimpleEnvs = heraAppEnvService.querySimpleEnvAppBaseInfoId(milogAppId.intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public interface TailExtensionService {

void logTailDelPostProcess(MilogLogStoreDO logStoreDO, MilogLogTailDo milogLogtailDo);

List<MilogAppEnvDTO> getEnInfosByAppId(AppBaseInfo appBaseInfo, Long milogAppId, Integer deployWay);
List<MilogAppEnvDTO> getEnInfosByAppId(AppBaseInfo appBaseInfo, Long milogAppId, Integer deployWay, String machineRoom);

boolean decorateTailDTOValId(Integer logType, Integer appType);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,15 +550,15 @@ private List<MapDTO> queryAppInfo(String appName, Integer type) {
* @return
*/
@Override
public Result<List<MilogAppEnvDTO>> getEnInfosByAppId(Long milogAppId, Integer deployWay) {
public Result<List<MilogAppEnvDTO>> getEnInfosByAppId(Long milogAppId, Integer deployWay, String machineRoom) {
if (null == milogAppId) {
return Result.failParam("The parameter cannot be empty");
}
AppBaseInfo appBaseInfo = heraAppService.queryById(milogAppId);
if (null == appBaseInfo) {
return Result.failParam("The app does not exist");
}
List<MilogAppEnvDTO> appEnvDTOList = tailExtensionService.getEnInfosByAppId(appBaseInfo, milogAppId, deployWay);
List<MilogAppEnvDTO> appEnvDTOList = tailExtensionService.getEnInfosByAppId(appBaseInfo, milogAppId, deployWay,machineRoom);
return Result.success(appEnvDTOList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class LogStreamConstants {

public static final String postProcessingProviderBeanSuffix = "postProcessing";

public static final String DEFAULT_MESSAGE_LIFECYCLE_MANAGER = "defaultMessageLifecycleManager";

public static final String LOG_STREAM_SPACE_ID = "spaceId";
public static final String LOG_STREAM_STORE_ID = "storeId";
public static final String LOG_STREAM_TAIL_ID = "tailId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/
package com.xiaomi.mone.log.stream.job;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.parse.LogParser;
import com.xiaomi.mone.log.stream.common.LogStreamConstants;
import com.xiaomi.mone.log.stream.common.SinkJobEnum;
import com.xiaomi.mone.log.stream.job.extension.DefaultLogSendFilter;
import com.xiaomi.mone.log.stream.job.extension.MessageLifecycleManager;
import com.xiaomi.mone.log.stream.job.extension.MessageSender;
import com.xiaomi.mone.log.stream.job.extension.MqMessagePostProcessing;
import com.xiaomi.mone.log.stream.sink.SinkChain;
Expand Down Expand Up @@ -67,6 +70,8 @@ public class LogDataTransfer {

private LogSendFilter logSendFilter;

private MessageLifecycleManager messageLifecycleManager;

public LogDataTransfer(SinkChain sinkChain, LogParser logParser,
MessageSender messageSender, SinkJobConfig sinkJobConfig) {
this.sinkChain = sinkChain;
Expand All @@ -76,35 +81,60 @@ public LogDataTransfer(SinkChain sinkChain, LogParser logParser,
String mqPostProcessingBean = sinkJobConfig.getMqType() + LogStreamConstants.postProcessingProviderBeanSuffix;
this.messagePostProcessing = Ioc.ins().getBean(mqPostProcessingBean);
this.logSendFilter = Ioc.ins().getBean(DefaultLogSendFilter.class);

this.messageLifecycleManager = getMessageLifecycleManager();
}

private MessageLifecycleManager getMessageLifecycleManager() {
String factualServiceName = Config.ins().get("message.lifecycle.manager", DEFAULT_MESSAGE_LIFECYCLE_MANAGER);
return Ioc.ins().getBean(factualServiceName);
}


public void handleMessage(String type, String msg, String time) {
Map<String, Object> dataMap;
try {
LineMessage lineMessage = objectMapper.readValue(msg, LineMessage.class);

String ip = lineMessage.getProperties(LineMessage.KEY_IP);
Long lineNumber = lineMessage.getLineNumber();
dataMap = logParser.parse(lineMessage.getMsgBody(), ip, lineNumber, lineMessage.getTimestamp(), lineMessage.getFileName());
putCommonData(dataMap);
if (SinkJobEnum.NORMAL_JOB == jobType) {
if (null != dataMap && !sinkChain.execute(dataMap)) {
sendMessage(dataMap);
}
} else {
sendMessage(dataMap);
}
if (sendMsgNumber.get() % COUNT_NUM == 0 || sendMsgNumber.get() == 1) {
log.info(jobType.name() + " send msg:{}", dataMap);
}
LineMessage lineMessage = parseLineMessage(msg);

messageLifecycleManager.beforeProcess(sinkJobConfig, lineMessage);

Map<String, Object> dataMap = parseMessage(lineMessage);

messageLifecycleManager.afterProcess(sinkJobConfig, lineMessage, dataMap);

toSendMessage(dataMap);

messagePostProcessing.postProcessing(sinkJobConfig, msg);
} catch (Exception e) {
log.error(jobType.name() + " parse and send error", e);
throw new RuntimeException(String.format("handleMessage error,msg:%s", msg), e);
}
}

private void toSendMessage(Map<String, Object> dataMap) throws Exception {
if (SinkJobEnum.NORMAL_JOB == jobType) {
if (null != dataMap && !sinkChain.execute(dataMap)) {
sendMessage(dataMap);
}
} else {
sendMessage(dataMap);
}
if (sendMsgNumber.get() % COUNT_NUM == 0 || sendMsgNumber.get() == 1) {
log.info(jobType.name() + " send msg:{}", dataMap);
}
}

private Map<String, Object> parseMessage(LineMessage lineMessage) {
String ip = lineMessage.getProperties(LineMessage.KEY_IP);
Long lineNumber = lineMessage.getLineNumber();
Map<String, Object> dataMap = logParser.parse(lineMessage.getMsgBody(), ip, lineNumber, lineMessage.getTimestamp(), lineMessage.getFileName());
putCommonData(dataMap);
return dataMap;
}

private LineMessage parseLineMessage(String msg) throws JsonProcessingException {
return objectMapper.readValue(msg, LineMessage.class);
}

private void putCommonData(Map<String, Object> dataMap) {
dataMap.putIfAbsent(LOG_STREAM_SPACE_ID, sinkJobConfig.getLogSpaceId());
dataMap.putIfAbsent(LOG_STREAM_STORE_ID, sinkJobConfig.getLogStoreId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 Xiaomi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.xiaomi.mone.log.stream.job.extension;

import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.stream.job.SinkJobConfig;

import java.util.Map;

/**
*
* @description
* @version 1.0
* @author wtt
* @date 2024/6/4 11:20
*
*/
public interface MessageLifecycleManager {

void beforeProcess(SinkJobConfig sinkJobConfig, LineMessage lineMessage);

void afterProcess(SinkJobConfig sinkJobConfig, LineMessage lineMessage, Map<String, Object> message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Xiaomi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.xiaomi.mone.log.stream.job.extension.impl;

import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.stream.job.SinkJobConfig;
import com.xiaomi.mone.log.stream.job.extension.MessageLifecycleManager;
import com.xiaomi.youpin.docean.anno.Service;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

import static com.xiaomi.mone.log.stream.common.LogStreamConstants.DEFAULT_MESSAGE_LIFECYCLE_MANAGER;

/**
*
* @description
* @version 1.0
* @author wtt
* @date 2024/6/4 11:24
*
*/
@Service(name = DEFAULT_MESSAGE_LIFECYCLE_MANAGER)
@Slf4j
public class DefaultMessageLifecycleManager implements MessageLifecycleManager {
@Override
public void beforeProcess(SinkJobConfig sinkJobConfig, LineMessage lineMessage) {

}

@Override
public void afterProcess(SinkJobConfig sinkJobConfig, LineMessage lineMessage, Map<String, Object> message) {

}
}

0 comments on commit d88a0fa

Please sign in to comment.