Skip to content

Troubleshooting: Using Cosmos DB Aggregates

Fabian Meiswinkel edited this page Oct 13, 2021 · 2 revisions

When working with Spark connector with Cosmos DB Aggregates (currently COUNT, SUM, AVG, MIN, MAX), you may use the VALUE keyword, e.g.

SELECT VALUE COUNT(1) FROM c WHERE c.origin = 'SEA'

But the result of a VALUE has no schema defined which may result in an error. Let's use the example below (which is using pyDocumentDB:


# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery 
connectionPolicy.PreferredLocations = ["Central US", "East US 2", "West US", "Southeast Asia", "Western Europe","Canada Central"]

# Set keys to connect to Cosmos DB 
masterKey = '<YourMasterKey>' 
host = 'https://doctorwho.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)

# Configure Database and Collections
databaseId = 'DepartureDelays'
collectionId = 'flights_pcoll'

# Configurations the Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

# Set query parameter
querystr = "SELECT VALUE COUNT(1) FROM c WHERE c.origin = 'SEA'"  

# Query documents
query = client.QueryDocuments(collLink, querystr, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Convert to DataFrame
df = spark.createDataFrame(list(query))

This will result in the error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 524, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 386, in _createFromLocal
    struct = self._inferSchemaFromList(data)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 318, in _inferSchemaFromList
    schema = reduce(_merge_type, map(_infer_schema, data))
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 992, in _infer_schema
    raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <type 'int'>

As noted, this error is due to the fact that Spark SQL cannot infer the schema for type int. To solve this problem, you will need to explicitly define the schema, in this case the IntegerType(). Just re-write the last line of code as:

from pyspark.sql.types import IntegerType
df = spark.createDataFrame(list(query), IntegerType())
df.show()
+-----+
|value|
+-----+
|23078|
+-----+