-
Notifications
You must be signed in to change notification settings - Fork 145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ransomware production version #1176
base: branch-23.11
Are you sure you want to change the base?
Ransomware production version #1176
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below are some general comments that need to be fixed:
- The
.idea
directory and all of its contents should not be included in the PR. If we need to add to the.gitignore
to prevent this in the future, we can. - Many find/replace errors changing
common
->common2
need to be reverted. - The example changed the input source from a file to Kafka but the README was not updated to describe setting up Kafka and seeding the data into the message broker
- CI needs to be passing
In addition, @bsuryadevara can you review the changes to the ransomware example stages?
@@ -404,7 +404,7 @@ def get_merge_target(): | |||
|
|||
def determine_merge_commit(current_branch="HEAD"): | |||
""" | |||
When running outside of CI, this will estimate the target merge commit hash of `current_branch` by finding a common | |||
When running outside of CI, this will estimate the target merge commit hash of `current_branch` by finding a common2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Find/replace error
@@ -416,7 +416,7 @@ def determine_merge_commit(current_branch="HEAD"): | |||
Returns | |||
------- | |||
str | |||
The common commit hash ID | |||
The common2 commit hash ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Find/replace error
@@ -43,7 +43,7 @@ docker run --rm -ti --gpus=all -p8000:8000 -p8001:8001 -p8002:8002 -v $PWD/model | |||
tritonserver --model-repository=/models/triton-model-repo \ | |||
--exit-on-error=false \ | |||
--model-control-mode=explicit \ | |||
--load-model ransomw-model-short-rf | |||
--load-model ransomware_model_tl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of our models use hyphens instead of underscores. Can you rename the model ransomware-model-tl
?
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this copyright removed?
@@ -0,0 +1,6 @@ | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What is this file used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsuryadevara I found this file in the previous model. Do you know if we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elishahaim It is generated by MLFlow not needed
@@ -60,7 +60,7 @@ def __init__( | |||
n_workers: int = 2, | |||
threads_per_worker: int = 2, | |||
): | |||
self._client = Client(threads_per_worker=threads_per_worker, n_workers=n_workers) | |||
# self._client = Client(threads_per_worker=threads_per_worker, n_workers=n_workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excess comment should be removed.
|
||
extract_func = self._fe.extract_features | ||
combine_func = FeatureExtractor.combine_features | ||
df['PID_Process'] = df.PID.astype(str) + '_'# + df.Process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excess comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elishahaim do we need concatenate _
to PID_Process
even though the process is being commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to verify it, if we really need the underscore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bsuryadevara I checked it, and we can remove it:
df['PID_Process'] = df.PID.astype(str)
# Close dask client when pipeline initiates shutdown | ||
self._client.close() | ||
pass | ||
# self._client.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More excess comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please remove the on_completed function as we are not using Dask for the create_features
stage?
""" | ||
This class extends PreprocessBaseStage and process the features that are derived from Appshield data. | ||
This class extends PreprocessBaseStage and process the features that aree derived from Appshield data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo
offset=0, | ||
count=snapshot_df_size) | ||
current_time = datetime.datetime.now() | ||
print(f"Preprocessing snapshot sequence: {sequence} is completed at time: {current_time.strftime('%Y-%m-%d %H:%M:%S.%f')}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the logging module instead of print()
/ok to test |
@mdemoret-nv sure, will review |
If the |
""" | ||
return (MessageMeta, ) | ||
|
||
def supports_cpp_node(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add return type annotation to all public functions?
multiple_snapshots.setdefault(source, []).append(scan_id) | ||
return multiple_snapshots | ||
|
||
def _hold_plugin_df(self, source, scan_id, plugin, plugin_df): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to consider adding a time parameter to retain the plugin dataframes in memory?
For instance, when the pipeline receives snapshot 1, plugin 1 processes it, and plugin 2 waits for plugin 3 to bundle its data and push it to the "create features" stage. However, in some cases, if plugin 3 fails to ingest data into the pipeline due to various reasons memory allocated to the plugin 1 and plugin 2 will not be released.
With the addition of a time parameter, we can ensure that plugin 1 and plugin 2 remain in memory only for the duration of the pipeline context, improving memory management and resource efficiency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bsuryadevara,
I think we already removing the old snapshots.
I printed the existing snapshots in the source (dictionary - memory object), and it looks like we already clean it whenever a new snapshot is added:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to handled the missed sequence snapshots? For example, if snapshots s1 and s12 are received but s3 is skipped and s4 arrives instead, will the workflow retain those snapshots held in the memory.
All we have to do is, If a sequence is found to be missing, add a check condition to initiate the cleanup process for the previous snapshots that are stored in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _clean_snapshots(self, source, scan_id):
scan_ids_exist = source.keys()
for scan_id_exist in scan_ids_exist:
if scan_id > scan_id_exist+2:
del source[scan_id_exist]
def _hold_plugin_df(self, source, scan_id, plugin, plugin_df):
if source not in self._plugin_df_dict:
self._plugin_df_dict[source] = {}
source = self._plugin_df_dict[source]
if scan_id not in source:
source[scan_id] = {}
snapshot = source[scan_id]
if plugin not in snapshot:
snapshot[plugin] = plugin_df
else:
snapshot[plugin] = pd.concat([snapshot[plugin], plugin_df])
self._clean_snapshots(source, scan_id)
metas = [] | ||
|
||
for source, df in x.items(): | ||
# Now make a AppShieldMessageMeta with the source name | ||
meta = AppShieldMessageMeta(df, source) | ||
metas.append(meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metas = [] | |
for source, df in x.items(): | |
# Now make a AppShieldMessageMeta with the source name | |
meta = AppShieldMessageMeta(df, source) | |
metas.append(meta) | |
metas = [AppShieldMessageMeta(df, source) for source, df in x.items()] |
|
||
extract_func = self._fe.extract_features | ||
combine_func = FeatureExtractor.combine_features | ||
df['PID_Process'] = df.PID.astype(str) + '_'# + df.Process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elishahaim do we need concatenate _
to PID_Process
even though the process is being commented out?
# Close dask client when pipeline initiates shutdown | ||
self._client.close() | ||
pass | ||
# self._client.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please remove the on_completed function as we are not using Dask for the create_features
stage?
# Amount of files path in handles files | ||
file_paths = x[x.Type == 'File'].Name.str.lower() | ||
file_paths = x.Name.str.lower()#x[x.Type == 'File'].Name.str.lower() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excess comment
@click.option( | ||
"--n_dask_workers", | ||
default=6, | ||
default=1, | ||
type=click.IntRange(min=1), | ||
help="Number of dask workers.", | ||
) | ||
@click.option( | ||
"--threads_per_dask_worker", | ||
default=2, | ||
default=1, | ||
type=click.IntRange(min=1), | ||
help="Number of threads per each dask worker.", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this option as Dask not being used?
@@ -108,20 +110,13 @@ Options: | |||
--server_url TEXT Tritonserver url [required] | |||
--sliding_window INTEGER RANGE Sliding window to be used for model input | |||
request [x>=1] | |||
--input_glob TEXT Input glob pattern to match files to read. | |||
--input_topic TEXT Input Kafka topic for receiving the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remove Dask related options?
@@ -72,10 +72,12 @@ Run the following from the `examples/ransomware_detection` directory to start th | |||
```bash | |||
python run.py --server_url=localhost:8001 \ | |||
--sliding_window=3 \ | |||
--model_name=ransomw-model-short-rf \ | |||
--model_name=ransomware_model_tl \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we include some high-level information about the model, including details about the dataset used for training? Additionally, it would be greatly beneficial to explain how to generate the dataset for training the models or specify the required data for running the inference pipeline.
Would be valuable to create a notebook that demonstrates how to train the model using a sample dataset and run it through the pipeline to showcase its ransomware capabilities.
Providing an explanation of the output structure generated by the pipeline would greatly enhance the comprehensibility of the documentation. Adding this information to the documentation would be much appreciated.
snapshot_ids: typing.List[int], | ||
source_pid_process: str, | ||
snapshot_df: pd.DataFrame): | ||
def _rollover_pending_snapshots(self, source_pid_process: str, snapshots_dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also consider adding a time duration to retain the pending rollover snapshots in memory? Without this, they will remain in memory and keep waiting for the sequence (in case if sequences were not ingested to the pipeline)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bsuryadevara ,
I added a function that checks if the scan_ids are consecutively ordered.. like: [1,2,3]...
I think that we don't have pending snapshots in the memory... (I printed them, and they were always cleaned)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw in the past, that we can have an anomaly if we have old snapshots. I.e, if we restarted the pipeline but we had snapshots that we don't read from kafka... So it can be problematic...
In this version, we updated the ransomware detection pipeline to the production environment