-
Notifications
You must be signed in to change notification settings - Fork 121
Troubleshooting: Using Cosmos DB Aggregates
Fabian Meiswinkel edited this page Oct 13, 2021
·
2 revisions
When working with Spark connector with Cosmos DB Aggregates (currently COUNT
, SUM
, AVG
, MIN
, MAX
), you may use the VALUE
keyword, e.g.
SELECT VALUE COUNT(1) FROM c WHERE c.origin = 'SEA'
But the result of a VALUE
has no schema defined which may result in an error. Let's use the example below (which is using pyDocumentDB
:
# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime
# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = ["Central US", "East US 2", "West US", "Southeast Asia", "Western Europe","Canada Central"]
# Set keys to connect to Cosmos DB
masterKey = '<YourMasterKey>'
host = 'https://doctorwho.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)
# Configure Database and Collections
databaseId = 'DepartureDelays'
collectionId = 'flights_pcoll'
# Configurations the Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId
# Set query parameter
querystr = "SELECT VALUE COUNT(1) FROM c WHERE c.origin = 'SEA'"
# Query documents
query = client.QueryDocuments(collLink, querystr, options= { 'enableCrossPartitionQuery': True }, partition_key=None)
# Convert to DataFrame
df = spark.createDataFrame(list(query))
This will result in the error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 524, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 386, in _createFromLocal
struct = self._inferSchemaFromList(data)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 318, in _inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
File "/usr/hdp/current/spark2-client/python/pyspark/sql/types.py", line 992, in _infer_schema
raise TypeError("Can not infer schema for type: %s" % type(row))
TypeError: Can not infer schema for type: <type 'int'>
As noted, this error is due to the fact that Spark SQL cannot infer the schema for type int
. To solve this problem, you will need to explicitly define the schema, in this case the IntegerType()
. Just re-write the last line of code as:
from pyspark.sql.types import IntegerType
df = spark.createDataFrame(list(query), IntegerType())
df.show()
+-----+
|value|
+-----+
|23078|
+-----+