Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ransomware production version #1176

Open
wants to merge 2 commits into
base: branch-23.11
Choose a base branch
from

Conversation

elishahaim
Copy link

In this version, we updated the ransomware detection pipeline to the production environment

@elishahaim elishahaim requested review from a team as code owners September 7, 2023 16:34
@copy-pr-bot
Copy link

copy-pr-bot bot commented Sep 7, 2023

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Copy link
Contributor

@mdemoret-nv mdemoret-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below are some general comments that need to be fixed:

  • The .idea directory and all of its contents should not be included in the PR. If we need to add to the .gitignore to prevent this in the future, we can.
  • Many find/replace errors changing common -> common2 need to be reverted.
  • The example changed the input source from a file to Kafka but the README was not updated to describe setting up Kafka and seeding the data into the message broker
  • CI needs to be passing

In addition, @bsuryadevara can you review the changes to the ransomware example stages?

@@ -404,7 +404,7 @@ def get_merge_target():

def determine_merge_commit(current_branch="HEAD"):
"""
When running outside of CI, this will estimate the target merge commit hash of `current_branch` by finding a common
When running outside of CI, this will estimate the target merge commit hash of `current_branch` by finding a common2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find/replace error

@@ -416,7 +416,7 @@ def determine_merge_commit(current_branch="HEAD"):
Returns
-------
str
The common commit hash ID
The common2 commit hash ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Find/replace error

@@ -43,7 +43,7 @@ docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/model
tritonserver --model-repository=/models/triton-model-repo \
--exit-on-error=false \
--model-control-mode=explicit \
--load-model ransomw-model-short-rf
--load-model ransomware_model_tl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of our models use hyphens instead of underscores. Can you rename the model ransomware-model-tl?

Comment on lines -1 to -14
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this copyright removed?

@@ -0,0 +1,6 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What is this file used for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsuryadevara I found this file in the previous model. Do you know if we need it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elishahaim It is generated by MLFlow not needed

@@ -60,7 +60,7 @@ def __init__(
n_workers: int = 2,
threads_per_worker: int = 2,
):
self._client = Client(threads_per_worker=threads_per_worker, n_workers=n_workers)
# self._client = Client(threads_per_worker=threads_per_worker, n_workers=n_workers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excess comment should be removed.


extract_func = self._fe.extract_features
combine_func = FeatureExtractor.combine_features
df['PID_Process'] = df.PID.astype(str) + '_'# + df.Process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excess comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elishahaim do we need concatenate _ to PID_Process even though the process is being commented out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to verify it, if we really need the underscore

Copy link
Author

@elishahaim elishahaim Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsuryadevara I checked it, and we can remove it:
df['PID_Process'] = df.PID.astype(str)

# Close dask client when pipeline initiates shutdown
self._client.close()
pass
# self._client.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More excess comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove the on_completed function as we are not using Dask for the create_features stage?

"""
This class extends PreprocessBaseStage and process the features that are derived from Appshield data.
This class extends PreprocessBaseStage and process the features that aree derived from Appshield data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

offset=0,
count=snapshot_df_size)
current_time = datetime.datetime.now()
print(f"Preprocessing snapshot sequence: {sequence} is completed at time: {current_time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the logging module instead of print()

@mdemoret-nv
Copy link
Contributor

/ok to test

@mdemoret-nv mdemoret-nv added non-breaking Non-breaking change improvement Improvement to existing functionality labels Sep 19, 2023
@bsuryadevara
Copy link
Contributor

Below are some general comments that need to be fixed:

  • The .idea directory and all of its contents should not be included in the PR. If we need to add to the .gitignore to prevent this in the future, we can.
  • Many find/replace errors changing common -> common2 need to be reverted.
  • The example changed the input source from a file to Kafka but the README was not updated to describe setting up Kafka and seeding the data into the message broker
  • CI needs to be passing

In addition, @bsuryadevara can you review the changes to the ransomware example stages?

@mdemoret-nv sure, will review

@mdemoret-nv
Copy link
Contributor

If the AppShieldSourceStage is no longer being used anywhere, lets consider removing it and any tests for that stage.

"""
return (MessageMeta, )

def supports_cpp_node(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add return type annotation to all public functions?

multiple_snapshots.setdefault(source, []).append(scan_id)
return multiple_snapshots

def _hold_plugin_df(self, source, scan_id, plugin, plugin_df):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to consider adding a time parameter to retain the plugin dataframes in memory?

For instance, when the pipeline receives snapshot 1, plugin 1 processes it, and plugin 2 waits for plugin 3 to bundle its data and push it to the "create features" stage. However, in some cases, if plugin 3 fails to ingest data into the pipeline due to various reasons memory allocated to the plugin 1 and plugin 2 will not be released.

With the addition of a time parameter, we can ensure that plugin 1 and plugin 2 remain in memory only for the duration of the pipeline context, improving memory management and resource efficiency.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bsuryadevara,
I think we already removing the old snapshots.
I printed the existing snapshots in the source (dictionary - memory object), and it looks like we already clean it whenever a new snapshot is added:
dictionary_source

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to handled the missed sequence snapshots? For example, if snapshots s1 and s12 are received but s3 is skipped and s4 arrives instead, will the workflow retain those snapshots held in the memory.
All we have to do is, If a sequence is found to be missing, add a check condition to initiate the cleanup process for the previous snapshots that are stored in memory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _clean_snapshots(self, source, scan_id):
    scan_ids_exist = source.keys()
    for scan_id_exist in scan_ids_exist:
        if scan_id > scan_id_exist+2:
            del source[scan_id_exist]

def _hold_plugin_df(self, source, scan_id, plugin, plugin_df):
    if source not in self._plugin_df_dict:
        self._plugin_df_dict[source] = {}
    source = self._plugin_df_dict[source]

    if scan_id not in source:
        source[scan_id] = {}
    snapshot = source[scan_id]

    if plugin not in snapshot:
        snapshot[plugin] = plugin_df
    else:
        snapshot[plugin] = pd.concat([snapshot[plugin], plugin_df])

    self._clean_snapshots(source, scan_id)

Comment on lines +142 to +147
metas = []

for source, df in x.items():
# Now make a AppShieldMessageMeta with the source name
meta = AppShieldMessageMeta(df, source)
metas.append(meta)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metas = []
for source, df in x.items():
# Now make a AppShieldMessageMeta with the source name
meta = AppShieldMessageMeta(df, source)
metas.append(meta)
metas = [AppShieldMessageMeta(df, source) for source, df in x.items()]


extract_func = self._fe.extract_features
combine_func = FeatureExtractor.combine_features
df['PID_Process'] = df.PID.astype(str) + '_'# + df.Process
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elishahaim do we need concatenate _ to PID_Process even though the process is being commented out?

# Close dask client when pipeline initiates shutdown
self._client.close()
pass
# self._client.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please remove the on_completed function as we are not using Dask for the create_features stage?

# Amount of files path in handles files
file_paths = x[x.Type == 'File'].Name.str.lower()
file_paths = x.Name.str.lower()#x[x.Type == 'File'].Name.str.lower()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excess comment

Comment on lines 48 to 59
@click.option(
"--n_dask_workers",
default=6,
default=1,
type=click.IntRange(min=1),
help="Number of dask workers.",
)
@click.option(
"--threads_per_dask_worker",
default=2,
default=1,
type=click.IntRange(min=1),
help="Number of threads per each dask worker.",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this option as Dask not being used?

@@ -108,20 +110,13 @@ Options:
--server_url TEXT Tritonserver url [required]
--sliding_window INTEGER RANGE Sliding window to be used for model input
request [x>=1]
--input_glob TEXT Input glob pattern to match files to read.
--input_topic TEXT Input Kafka topic for receiving the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you remove Dask related options?

@@ -72,10 +72,12 @@ Run the following from the `examples/ransomware_detection` directory to start th
```bash
python run.py --server_url=localhost:8001 \
--sliding_window=3 \
--model_name=ransomw-model-short-rf \
--model_name=ransomware_model_tl \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we include some high-level information about the model, including details about the dataset used for training? Additionally, it would be greatly beneficial to explain how to generate the dataset for training the models or specify the required data for running the inference pipeline.

Would be valuable to create a notebook that demonstrates how to train the model using a sample dataset and run it through the pipeline to showcase its ransomware capabilities.

Providing an explanation of the output structure generated by the pipeline would greatly enhance the comprehensibility of the documentation. Adding this information to the documentation would be much appreciated.

snapshot_ids: typing.List[int],
source_pid_process: str,
snapshot_df: pd.DataFrame):
def _rollover_pending_snapshots(self, source_pid_process: str, snapshots_dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also consider adding a time duration to retain the pending rollover snapshots in memory? Without this, they will remain in memory and keep waiting for the sequence (in case if sequences were not ingested to the pipeline)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bsuryadevara ,
I added a function that checks if the scan_ids are consecutively ordered.. like: [1,2,3]...
I think that we don't have pending snapshots in the memory... (I printed them, and they were always cleaned)...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw in the past, that we can have an anomaly if we have old snapshots. I.e, if we restarted the pipeline but we had snapshots that we don't read from kafka... So it can be problematic...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement to existing functionality non-breaking Non-breaking change
Projects
Status: Review - Changes Requested
Development

Successfully merging this pull request may close these issues.

3 participants