Skip to content

Commit

Permalink
Update WriteToVectorDBStage
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Oct 19, 2023
1 parent dce7991 commit 55052e7
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions morpheus/stages/output/write_to_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
from morpheus.service.vector_db_service import VectorDBService
from morpheus.utils.vector_db_service_utils import VectorDBServiceFactory

logger = logging.getLogger(__name__)


class WriteToVectorDBStage(SinglePortStage):
class WriteToVectorDBStage(PassThruTypeMixin, SinglePortStage):
"""
Writes messages to a Vector Database.
Expand Down Expand Up @@ -100,9 +100,7 @@ def on_completed(self):
# Close vector database service connection
self._service.close()

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:

stream = input_stream[0]
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:

def on_data(ctrl_msg: ControlMessage) -> ControlMessage:
# Insert entries in the dataframe to vector database.
Expand All @@ -116,8 +114,6 @@ def on_data(ctrl_msg: ControlMessage) -> ControlMessage:

to_vector_db = builder.make_node(self.unique_name, ops.map(on_data), ops.on_completed(self.on_completed))

builder.make_edge(stream, to_vector_db)
stream = to_vector_db
builder.make_edge(input_node, to_vector_db)

# Return input unchanged to allow passthrough
return stream, input_stream[1]
return to_vector_db

0 comments on commit 55052e7

Please sign in to comment.