Skip to content

Commit

Permalink
batch config
Browse files Browse the repository at this point in the history
  • Loading branch information
mongoKart committed Oct 16, 2023
1 parent 832ebd5 commit dbf7f33
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 119 deletions.
104 changes: 3 additions & 101 deletions source/batch-mode/batch-read-config.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.. _spark-read-conf:
.. _spark-batch-read-conf:

==========================
Read Configuration Options
Expand All @@ -11,11 +12,12 @@ Read Configuration Options
:class: singlecol

.. _spark-input-conf:
.. _spark-batch-input-conf:

Read Configuration
------------------

You can configure the following properties to read from MongoDB:
You can configure the following properties when reading data from MongoDB in batch mode:

.. note::

Expand Down Expand Up @@ -153,14 +155,12 @@ partitioners:
.. _conf-samplepartitioner:

``SamplePartitioner`` Configuration
```````````````````````````````````

.. include:: /includes/sparkconf-partitioner-options-note.rst

You must specify this partitioner using the full classname:
``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner``.


.. list-table::
:header-rows: 1
:widths: 35 65
Expand All @@ -179,7 +179,6 @@ You must specify this partitioner using the full classname:

**Default:** ``64``


* - ``partitioner.options.samples.per.partition``
- The number of samples to take per partition. The total number of
samples taken is:
Expand All @@ -205,7 +204,6 @@ You must specify this partitioner using the full classname:

``ShardedPartitioner`` Configuration
`````````````````````````````````````

The ``ShardedPartitioner`` automatically determines the partitions to use
based on your shard configuration.

Expand All @@ -216,7 +214,6 @@ You must specify this partitioner using the full classname:

This partitioner is not compatible with hashed shard keys.


.. _conf-mongopaginatebysizepartitioner:
.. _conf-paginatebysizepartitioner:

Expand Down Expand Up @@ -251,13 +248,11 @@ You must specify this partitioner using the full classname:

``PaginateIntoPartitionsPartitioner`` Configuration
```````````````````````````````````````````````````

.. include:: /includes/sparkconf-partitioner-options-note.rst

You must specify this partitioner using the full classname:
``com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner``.


.. list-table::
:header-rows: 1
:widths: 35 65
Expand Down Expand Up @@ -287,99 +282,6 @@ You must specify this partitioner using the full classname:

This partitioner creates a single partition.

.. _spark-change-stream-conf:

Change Streams
--------------

.. note::

If you use ``SparkConf`` to set the connector's change stream
configurations, prefix ``spark.mongodb.`` to each property.

.. list-table::
:header-rows: 1
:widths: 35 65

* - Property name
- Description

* - ``change.stream.lookup.full.document``

- Determines what values your change stream returns on update
operations.

The default setting returns the differences between the original
document and the updated document.

The ``updateLookup`` setting returns the differences between the
original document and updated document as well as a copy of the
entire updated document.

**Default:** "default"

.. tip::

For more information on how this change stream option works,
see the MongoDB server manual guide
:manual:`Lookup Full Document for Update Operation </changeStreams/#lookup-full-document-for-update-operations>`.

* - ``change.stream.micro.batch.max.partition.count``
- | The maximum number of partitions the {+connector-short+} divides each
micro-batch into. Spark workers can process these partitions in parallel.
|
| This setting applies only when using micro-batch streams.
|
| **Default**: ``1``

.. warning:: Event Order

Specifying a value larger than ``1`` can alter the order in which
the {+connector-short+} processes change events. Avoid this setting
if out-of-order processing could create data inconsistencies downstream.

* - ``change.stream.publish.full.document.only``
- | Specifies whether to publish the changed document or the full
change stream document.
|
| When this setting is ``true``, the connector exhibits the following behavior:

- The connector filters out messages that
omit the ``fullDocument`` field and only publishes the value of the
field.
- If you don't specify a schema, the connector infers the schema
from the change stream document rather than from the underlying collection.

**Default**: ``false``

.. note::

This setting overrides the ``change.stream.lookup.full.document``
setting.

* - ``change.stream.startup.mode``
- | Specifies how the connector starts up when no offset is available.

| This setting accepts the following values:

- ``latest``: The connector begins processing
change events starting with the most recent event.
It will not process any earlier unprocessed events.
- ``timestamp``: The connector begins processing change events at a specified time.

To use the ``timestamp`` option, you must specify a time by using the
``change.stream.startup.mode.timestamp.start.at.operation.time`` setting.
This setting accepts timestamps in the following formats:

- An integer representing the number of seconds since the
:wikipedia:`Unix epoch <Unix_time>`
- A date and time in
`ISO-8601 <https://www.iso.org/iso-8601-date-and-time-format.html>`__
format with one-second precision
- An extended JSON ``BsonTimestamp``

**Default**: ``latest``

.. _configure-input-uri:

``connection.uri`` Configuration Setting
Expand Down
13 changes: 1 addition & 12 deletions source/batch-mode/batch-read.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.. _scala-read:
.. _java-read:
.. _scala-dataset-filters:
.. _batch-read-from-mongodb:

=================
Read from MongoDB
Expand Down Expand Up @@ -45,18 +46,6 @@ Overview

.. include:: /scala/filters.txt

.. important:: Inferring the Schema of a Change Stream

When the {+connector-short+} infers the schema of a data frame
read from a change stream, by default,
it will use the schema of the underlying collection rather than that
of the change stream. If you set the ``change.stream.publish.full.document.only``
option to ``true``, the connector uses the schema of the
change stream instead.

For more information on configuring a read operation, see the
:ref:`spark-change-stream-conf` section of the Read Configuration Options guide.

SQL Queries
-----------

Expand Down
10 changes: 5 additions & 5 deletions source/batch-mode/batch-write-config.txt
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
.. _spark-write-conf:
.. _spark-batch-write-conf:

===========================
Write Configuration Options
===========================

.. default-domain:: mongodb

.. contents:: On this page
:local:
:backlinks: none
:depth: 1
:class: singlecol

.. _spark-output-conf:
.. _spark-batch-output-conf:

Write Configuration
-------------------

The following options for writing to MongoDB are available:
You can configure the following properties when writing data to MongoDB in batch mode:

.. note::

Expand Down Expand Up @@ -169,12 +169,12 @@ The following options for writing to MongoDB are available:
guide on the
:manual:`WriteConcern wtimeout option </reference/write-concern/#wtimeout>`.

.. _configure-output-uri:
.. _configure-batch-output-uri:

``connection.uri`` Configuration Setting
----------------------------------------

You can set all :ref:`spark-output-conf` via the write ``connection.uri``.
You can set all :ref:`spark-batch-output-conf` via the write ``connection.uri``.

.. note::

Expand Down
3 changes: 2 additions & 1 deletion source/batch-mode/batch-write.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.. _write-to-mongodb:
.. _scala-write:
.. _java-write:
.. _batch-write-to-mongodb:

================
Write to MongoDB
Expand Down Expand Up @@ -49,4 +50,4 @@ Write to MongoDB
the connector writes the field name and ``null`` value to MongoDB. You can
change this behavior by setting the write configuration property
``ignoreNullValues``. For more information about setting the connector's
write behavior, see :ref:`Write Configuration Options <spark-write-conf>`.
write behavior, see :ref:`Write Configuration Options <spark-batch-write-conf>`.
13 changes: 13 additions & 0 deletions source/streaming-mode/streaming-read.txt
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,16 @@ To stream data from MongoDB to your console:
// run the query
val query = dataStreamWriter.start()

//todo

.. important:: Inferring the Schema of a Change Stream

When the {+connector-short+} infers the schema of a data frame
read from a change stream, by default,
it will use the schema of the underlying collection rather than that
of the change stream. If you set the ``change.stream.publish.full.document.only``
option to ``true``, the connector uses the schema of the
change stream instead.

For more information on configuring a read operation, see the
:ref:`spark-change-stream-conf` section of the Read Configuration Options guide.
93 changes: 93 additions & 0 deletions source/streaming-mode/streaming-write-config.txt
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,96 @@ database for the connection is ``foobar``:

spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.write.database=bar

.. _spark-change-stream-conf:

Change Streams
--------------

.. note::

If you use ``SparkConf`` to set the connector's change stream
configurations, prefix ``spark.mongodb.`` to each property.

.. list-table::
:header-rows: 1
:widths: 35 65

* - Property name
- Description

* - ``change.stream.lookup.full.document``

- Determines what values your change stream returns on update
operations.

The default setting returns the differences between the original
document and the updated document.

The ``updateLookup`` setting returns the differences between the
original document and updated document as well as a copy of the
entire updated document.

**Default:** "default"

.. tip::

For more information on how this change stream option works,
see the MongoDB server manual guide
:manual:`Lookup Full Document for Update Operation </changeStreams/#lookup-full-document-for-update-operations>`.

* - ``change.stream.micro.batch.max.partition.count``
- | The maximum number of partitions the {+connector-short+} divides each
micro-batch into. Spark workers can process these partitions in parallel.
|
| This setting applies only when using micro-batch streams.
|
| **Default**: ``1``

.. warning:: Event Order

Specifying a value larger than ``1`` can alter the order in which
the {+connector-short+} processes change events. Avoid this setting
if out-of-order processing could create data inconsistencies downstream.

* - ``change.stream.publish.full.document.only``
- | Specifies whether to publish the changed document or the full
change stream document.
|
| When this setting is ``true``, the connector exhibits the following behavior:

- The connector filters out messages that
omit the ``fullDocument`` field and only publishes the value of the
field.
- If you don't specify a schema, the connector infers the schema
from the change stream document rather than from the underlying collection.

**Default**: ``false``

.. note::

This setting overrides the ``change.stream.lookup.full.document``
setting.

* - ``change.stream.startup.mode``
- | Specifies how the connector starts up when no offset is available.

| This setting accepts the following values:

- ``latest``: The connector begins processing
change events starting with the most recent event.
It will not process any earlier unprocessed events.
- ``timestamp``: The connector begins processing change events at a specified time.

To use the ``timestamp`` option, you must specify a time by using the
``change.stream.startup.mode.timestamp.start.at.operation.time`` setting.
This setting accepts timestamps in the following formats:

- An integer representing the number of seconds since the
:wikipedia:`Unix epoch <Unix_time>`
- A date and time in
`ISO-8601 <https://www.iso.org/iso-8601-date-and-time-format.html>`__
format with one-second precision
- An extended JSON ``BsonTimestamp``

**Default**: ``latest``

0 comments on commit dbf7f33

Please sign in to comment.