Skip to content

Commit

Permalink
Update test
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Dec 20, 2023
1 parent bee89d3 commit ddd6572
Showing 1 changed file with 48 additions and 40 deletions.
88 changes: 48 additions & 40 deletions tests/examples/log_parsing/test_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@

import cupy as cp
import numpy as np
import pandas as pd
import pytest

import cudf

from _utils import TEST_DIRS
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import InferenceMemory
from morpheus.messages import InferenceMemoryNLP
from morpheus.messages import MessageMeta
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import ResponseMemory
from morpheus.messages import MultiResponseMessage
from morpheus.messages import MultiInferenceNLPMessage
from morpheus.messages import TensorMemory
from morpheus.stages.inference.triton_inference_stage import TritonInferenceWorker
from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue
from morpheus.utils.type_aliases import DataFrameType


@pytest.fixture(name="config")
Expand All @@ -43,7 +41,7 @@ def config_fixture(config: Config):
yield config


def build_response_mem(log_test_data_dir: str) -> ResponseMemory:
def build_response_mem(log_test_data_dir: str) -> TensorMemory:
# we have tensor data for the first five rows
count = 5
tensors = {}
Expand All @@ -52,15 +50,33 @@ def build_response_mem(log_test_data_dir: str) -> ResponseMemory:
host_data = np.loadtxt(tensor_file, delimiter=',')
tensors[tensor_name] = cp.asarray(host_data)

return ResponseMemory(count=count, **tensors)
return TensorMemory(count=count, tensors=tensors)


def build_resp_message(df: DataFrameType, num_cols: int = 2) -> MultiResponseMessage:
count = len(df)
seq_ids = cp.zeros((count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seq_ids[:, 2] = 42

meta = MessageMeta(df)
mem = TensorMemory(count=count,
tensors={
'confidences': cp.zeros((count, num_cols)),
'labels': cp.zeros((count, num_cols)),
'input_ids': cp.zeros((count, num_cols), dtype=cp.float32),
'seq_ids': seq_ids
})

return MultiResponseMessage(meta=meta, mess_offset=0, mess_count=count, memory=mem, offset=0, count=count)


def build_inf_message(df: typing.Union[pd.DataFrame, cudf.DataFrame],
def build_inf_message(df: DataFrameType,
mess_offset: int,
mess_count: int,
offset: int,
count: int,
num_cols: int = 2) -> MultiInferenceMessage:
num_cols: int = 2) -> MultiInferenceNLPMessage:
assert count >= mess_count
tensor_length = offset + count
seq_ids = cp.zeros((tensor_length, 3), dtype=cp.uint32)
Expand All @@ -78,12 +94,12 @@ def build_inf_message(df: typing.Union[pd.DataFrame, cudf.DataFrame],
input_mask=cp.zeros((tensor_length, num_cols), dtype=cp.float32),
seq_ids=seq_ids)

return MultiInferenceMessage(meta=meta,
mess_offset=mess_offset,
mess_count=mess_count,
memory=mem,
offset=offset,
count=count)
return MultiInferenceNLPMessage(meta=meta,
mess_offset=mess_offset,
mess_count=mess_count,
memory=mem,
offset=offset,
count=count)


def _check_worker(inference_mod: types.ModuleType, worker: TritonInferenceWorker, expected_mapping: dict[str, str]):
Expand Down Expand Up @@ -119,8 +135,7 @@ def test_log_parsing_triton_inference_log_parsing_constructor(config: Config,
@pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py')])
@pytest.mark.parametrize("mess_offset,mess_count,offset,count", [(0, 20, 0, 20), (5, 10, 5, 10)])
def test_log_parsing_triton_inference_log_parsing_build_output_message(config: Config,
filter_probs_df: typing.Union[pd.DataFrame,
cudf.DataFrame],
filter_probs_df: DataFrameType,
import_mod: typing.List[types.ModuleType],
mess_offset: int,
mess_count: int,
Expand Down Expand Up @@ -212,32 +227,25 @@ def test_log_parsing_inference_stage_get_inference_worker(config: Config, import

@pytest.mark.use_python
@pytest.mark.usefixtures("manual_seed", "config")
@pytest.mark.import_mod([
os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py'),
os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'messages.py')
])
@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py'))
@pytest.mark.parametrize("mess_offset,mess_count,offset,count", [(0, 5, 0, 5), (5, 5, 0, 5)])
def test_log_parsing_inference_stage_convert_one_response(import_mod: typing.List[types.ModuleType],
filter_probs_df: typing.Union[pd.DataFrame, cudf.DataFrame],
filter_probs_df: DataFrameType,
mess_offset,
mess_count,
offset,
count):
inference_mod, messages_mod = import_mod
inference_mod = import_mod

ttl_count = len(filter_probs_df)

input_res = build_response_mem(os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing'))

# confidences, labels & input_ids all have the same shape
num_cols = input_res.confidences.shape[1]
input_mem = InferenceMemory(count=ttl_count,
tensors={
'confidences': cp.zeros((ttl_count, num_cols), dtype=cp.float32),
'input_ids': cp.zeros((ttl_count, num_cols), dtype=cp.float32),
'labels': cp.zeros((ttl_count, num_cols), dtype=cp.float32),
'seq_ids': cp.zeros((ttl_count, 3), dtype=cp.uint32)
})
num_cols = input_res.get_tensor('confidences').shape[1]
resp_msg = build_resp_message(filter_probs_df, num_cols=num_cols)

orig_tensors = {k: v.copy() for (k, v) in resp_msg.memory.get_tensors().items()}

input_inf = build_inf_message(filter_probs_df,
mess_offset=mess_offset,
Expand All @@ -246,23 +254,23 @@ def test_log_parsing_inference_stage_convert_one_response(import_mod: typing.Lis
count=count,
num_cols=num_cols)

output_msg = inference_mod.LogParsingInferenceStage._convert_one_response(input_mem, input_inf, input_res)
output_msg = inference_mod.LogParsingInferenceStage._convert_one_response(resp_msg, input_inf, input_res)

assert isinstance(output_msg, messages_mod.MultiPostprocLogParsingMessage)
assert isinstance(output_msg, MultiResponseMessage)
assert output_msg.meta is input_inf.meta
assert output_msg.memory is input_mem
assert output_msg.mess_offset == mess_offset
assert output_msg.mess_count == mess_count
assert output_msg.offset == offset
assert output_msg.count == count

assert (output_msg.seq_ids == input_inf.seq_ids).all()
assert (output_msg.input_ids == input_inf.input_ids).all()
assert (output_msg.confidences == input_res.confidences).all()
assert (output_msg.labels == input_res.labels).all()
assert (output_msg.confidences == input_res.get_tensor('confidences')).all()
assert (output_msg.labels == input_res.get_tensor('labels')).all()

# Ensure we didn't write to the memory outside of the [offset:offset+count] bounds
tensors = input_mem.get_tensors()
tensors = resp_msg.memory.get_tensors()
for (tensor_name, tensor) in tensors.items():
assert (tensor[0:offset] == 0).all(), f"Out of bounds values for {tensor_name}"
assert (tensor[offset + count:] == 0).all(), f"Out of bounds values for {tensor_name}"
orig_tensor = orig_tensors[tensor_name]
assert (tensor[0:offset] == orig_tensor[0:offset]).all(), f"Out of bounds values for {tensor_name}"
assert (tensor[offset + count:] == orig_tensor[offset + count:]).all(), f"Out of bounds values for {tensor_name}"

0 comments on commit ddd6572

Please sign in to comment.