Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RM planner test and framework changes #581

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bin/run_testng
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#export MAVEN_OPTS=$MAVEN_OPTS" -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006"
bin=`dirname "${BASH_SOURCE-$0}"`
bin=`cd "$bin">/dev/null; pwd`
cd ${bin}/../framework
source ../conf/drillTestConfig.properties
if [[ $1 == "-Dtest="* ]]; then
mvn test $*
elif [[ $1 == "-h" || $1 == "-help" ]]; then
echo "bin/runtestng [-Dtest=<class_name>[#<method_name>]]"
else
echo "Running the entire Suite"
mvn test -Ptestng $*
fi
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import oadd.org.apache.drill.exec.proto.UserBitShared;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -203,12 +204,12 @@ public String toString() {
}

/**
* Get optimal memory allocated per operator.
* Utility parses the DrillQueryProfile
* Get total optimal memory allocated (in bytes) for specified operator, across all drillbits.
*
* @param operator
* @return
*/
public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) {
public long getTotalOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType operator) {
return this.fragmentProfiles
.stream()
.flatMap(f -> f.minorFragmentProfiles
Expand All @@ -219,6 +220,57 @@ public long getOptimalMemoryPerOperator(final UserBitShared.CoreOperatorType ope
.sum();
}

/**
* Returns the max of optimal memory allocated (in bytes) to specified operator on a drillbit.
*
* @param operator
* @return
*/
public long getOptimalMemoryPerOperatorPerNode(final UserBitShared.CoreOperatorType operator) {
return this.fragmentProfiles
.stream()
.flatMap(f -> f.minorFragmentProfiles
.stream())
.collect(Collectors.groupingBy(m -> m.endpoint.address))
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue()
.stream()
.flatMap(m -> m.operatorProfiles
.stream()
.filter(o -> o.operatorId == operator.getNumber()))
.mapToLong(o -> o.optimalMemAllocation)
.sum()))
.entrySet()
.stream()
.mapToLong(Map.Entry::getValue)
.max()
.orElse(0);
}

/**
* Total optimal memory required (in bytes) for the query.
* @return total optimal memory required for the query (as estimated by the RM planner).
*/
public long getTotalOptimalMemory() {
return getOperatorsFromProfile()
.stream()
.mapToLong(this::getTotalOptimalMemoryPerOperator)
.sum();
}

/**
* Returns the maximum of estimated optimal memory (in bytes) required on a drillbit.
* @return total optimal memory required for the query (as estimated by the RM planner).
*/
public long getTotalOptimalMemoryPerNode() {
return getOperatorsFromProfile()
.stream()
.mapToLong(this::getOptimalMemoryPerOperatorPerNode)
.sum();
}

/**
* Get different operators in the profile.
* @return a list of operators in the query profile.
Expand All @@ -233,17 +285,6 @@ public List<UserBitShared.CoreOperatorType> getOperatorsFromProfile() {
.mapToObj(UserBitShared.CoreOperatorType::forNumber)
.collect(Collectors.toList());
}

/**
* Total optimal memory required for the query.
* @return total optimal memory required for the query (as estimated by the RM planner).
*/
public long getTotalOptimalMemoryEstimate() {
return getOperatorsFromProfile()
.stream()
.mapToLong(this::getOptimalMemoryPerOperator)
.sum();
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class DrillRMConfig implements DrillConfigRenderer {
//Selector Configurations
public static final String SELECTOR_TAG_KEY = "tag";
public static final String SELECTOR_ACL_KEY = "acl";
public static final String SELECTOR_OR_KEY = "or";
public static final String SELECTOR_AND_KEY = "and";
public static final String SELECTOR_NOT_EQUAL_KEY = "not_equal";

//ACL Configurations
public static final String ACL_USERS_KEY = "users";
Expand Down Expand Up @@ -74,8 +77,10 @@ public class DrillRMConfig implements DrillConfigRenderer {
public static class SelectorConfig implements DrillConfigRenderer {

public String tag;

public AclConfig acl;
public SelectorConfig not_equal;
public List<SelectorConfig> or;
public List<SelectorConfig> and;

@Override
public String render() {
Expand All @@ -98,6 +103,21 @@ public String render(final int acc) {
sb.append(formatConfig(nextAcc, SELECTOR_ACL_KEY, acl));
}

if (not_equal != null) {
ensureAtleastOneField = true;
sb.append(formatConfig(nextAcc, SELECTOR_NOT_EQUAL_KEY, not_equal));
}

if (or != null) {
ensureAtleastOneField = true;
sb.append(formatConfig(nextAcc, SELECTOR_OR_KEY, or));
}

if (and != null) {
ensureAtleastOneField = true;
sb.append(formatConfig(nextAcc, SELECTOR_AND_KEY, and));
}

if(ensureAtleastOneField) {
sb.deleteCharAt(sb.length() - 1)
.deleteCharAt(sb.length() - 1)
Expand All @@ -119,7 +139,6 @@ public String render(final int acc) {
public static class AclConfig implements DrillConfigRenderer {

public List<String> users;

public List<String> groups;

@Override
Expand Down Expand Up @@ -163,7 +182,7 @@ public String render(final int acc) {
public static class QueueConfig implements DrillConfigRenderer {

@JsonProperty(QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY)
public long maxQueryMemoryPerNodeInMB;
public long maxQueryMemoryPerNode; //in bytes

@JsonProperty(QUEUE_MAX_WAITING_KEY)
public int maxWaitingQueries;
Expand All @@ -185,9 +204,9 @@ public String render(final int acc) {
StringBuilder sb = new StringBuilder("{\n");
final int nextAcc = acc+2;

if (maxQueryMemoryPerNodeInMB > 0) {
if (maxQueryMemoryPerNode > 0) {
ensureAtleastOneField = true;
sb.append(formatConfig(nextAcc, QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY, maxQueryMemoryPerNodeInMB));
sb.append(formatConfig(nextAcc, QUEUE_MAX_QUERY_MEMORY_PER_NODE_KEY, maxQueryMemoryPerNode));
}

if (maxWaitingQueries > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import java.util.Properties;

import static org.apache.drill.test.framework.DrillTestDefaults.DRILL_EXEC_RM_CONFIG_KEY;
import static org.apache.drill.test.framework.common.DrillTestNGDefaults.BASIC_RM_CONFIG_FILEPATH;
import static org.apache.drill.test.framework.common.DrillTestNGDefaults.UNIT_GROUP;
import static org.apache.drill.test.framework.common.DrillTestNGDefaults.SAMPLE_RM_CONFIG_NAME;
import static org.apache.drill.test.framework.common.DrillTestNGDefaults.PROD_RM_CONFIG_FILEPATH;

@Test(groups = UNIT_GROUP)
public class DrillTestFrameworkUnitTests extends DrillJavaTestBase {
Expand Down Expand Up @@ -84,7 +85,7 @@ public void testQueryProfileDoesNotExist() {
@Test(groups = UNIT_GROUP)
public void testReadSampleRMConfigFile() {
try {
DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME);
DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH);
Assert.assertEquals(drillRMConfig.poolName, "root",
"Root resource pool name did not match");

Expand All @@ -96,6 +97,22 @@ public void testReadSampleRMConfigFile() {
}
}

/**
* Test reading a sample RM config file, with complex selectors, in to a Java Bean.
*/
@Test(groups = UNIT_GROUP)
public void testReadComplexSelectorsRMConfigFile() {
try {
DrillRMConfig drillRMConfig = DrillRMConfig.load(BASIC_RM_CONFIG_FILEPATH);
Assert.assertEquals(drillRMConfig.childPools.get(1).selector.or.size(), 2,
"Or selector should have had 2 children!");

} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

/**
* Negative test to validate the behavior when the config file does not exist.
*/
Expand All @@ -112,7 +129,7 @@ public void testLoadConfigWhenFileDoesNotExist() throws IOException {
@Test(groups = UNIT_GROUP)
public void testConfigFileRenderer() {
try {
DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME);
DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH);
Assert.assertEquals(drillRMConfig.poolName, "root",
"Root resource pool name did not match");

Expand Down Expand Up @@ -149,7 +166,7 @@ public void testWriteRMConfigToFile() throws IOException {
}

try (BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
DrillRMConfig drillRMConfig = DrillRMConfig.load(SAMPLE_RM_CONFIG_NAME);
DrillRMConfig drillRMConfig = DrillRMConfig.load(PROD_RM_CONFIG_FILEPATH);
writer.write(DRILL_EXEC_RM_CONFIG_KEY + ":" + drillRMConfig.render());
}

Expand Down Expand Up @@ -178,14 +195,15 @@ public void testTotalMemoryForQueryProfile() {
DrillQueryProfile profile = Utils.getQueryProfile(queryId);
Assert.assertEquals(profile.queryId, queryId);

long rmMemEstimate = profile.getTotalOptimalMemoryEstimate();
long rmMemEstimate = profile.getTotalOptimalMemoryPerNode();
LOG.info("Memory estimated by RM planner: " + rmMemEstimate);
Assert.assertTrue(rmMemEstimate > 0,
"RM estimated memory should be greater than 0");
List<UserBitShared.CoreOperatorType> operators = profile.getOperatorsFromProfile();
Assert.assertTrue(operators.size() > 0,
"Number of operators in the profile should be greater than 0");
operators.forEach(LOG::info);
operators.forEach(o -> LOG.info("Operator: " + o + ", Optimal Memory per Node in bytes: " +
profile.getOptimalMemoryPerOperatorPerNode(o)));
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
public final class DrillTestNGDefaults {
public static final String FUNCTIONAL_GROUP = "functional";
public static final String UNIT_GROUP = "unit";
public static final String SAMPLE_RM_CONFIG_NAME =
DrillTestDefaults.CWD + "/src/test/resources/sample-drill-rm-override.conf";
public static final String BASIC_RM_CONFIG_NAME =
DrillTestDefaults.CWD + "/src/test/resources/basic-drill-rm-override.conf";
public static final String PROD_RM_CONFIG_FILEPATH =
DrillTestDefaults.CWD + "/src/test/resources/prod-rm.conf";
public static final String BASIC_RM_CONFIG_FILEPATH =
DrillTestDefaults.CWD + "/src/test/resources/basic-rm.conf";
public static final String DEFAULT_RM_CONFIG_FILEPATH =
DrillTestDefaults.CWD + "/src/test/resources/default-rm.conf";

public static String CONNECTION_URL_FOR_DRILLBIT(final String hostnameOrIp) {
return String.format("jdbc:drill:drillbit=%s", hostnameOrIp);
}
public static final String NO_RESOURCE_POOL_ERROR = "No resource pools to choose from for the query";
public static final String TPCH_01_PARQUET_SCHEMA = "dfs.drilltestdirtpch01parquet";
}
Loading