Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various issues like 190, 229 #231

Merged
merged 8 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions chdb/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ def main():
sql = options.sql[0]
output_format = options.format
res = query(sql, output_format)
if output_format.lower() in ('dataframe', 'arrowtable'):
temp = res
else:
temp = res.data()
print(temp, end="")
try:
if output_format.lower() in ("dataframe", "arrowtable"):
temp = res
else:
temp = res.data()
print(temp, end="")
except UnicodeDecodeError:
print(repr(res.bytes()))


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion chdb/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ CMAKE_ARGS="-DCMAKE_BUILD_TYPE=${build_type} -DENABLE_THINLTO=0 -DENABLE_TESTS=0
-DENABLE_UTILS=0 ${LLVM} ${UNWIND} \
${ICU} -DENABLE_UTF8PROC=1 ${JEMALLOC} \
-DENABLE_PARQUET=1 -DENABLE_ROCKSDB=1 -DENABLE_SQLITE=1 -DENABLE_VECTORSCAN=1 \
-DENABLE_PROTOBUF=1 -DENABLE_THRIFT=1 \
-DENABLE_PROTOBUF=1 -DENABLE_THRIFT=1 -DENABLE_MSGPACK=1 \
-DENABLE_RAPIDJSON=1 \
-DENABLE_BROTLI=1 -DENABLE_H3=1 \
-DENABLE_CLICKHOUSE_ALL=0 -DUSE_STATIC_LIBRARIES=1 -DSPLIT_SHARED_LIBRARIES=0 \
Expand Down
1 change: 1 addition & 0 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ local_result_v2 * queryToBuffer(
// Add path string
argv.push_back("--path=" + path);
}
// argv.push_back("--no-system-tables");
// Add query string
argv.push_back("--query=" + queryStr);

Expand Down
4 changes: 4 additions & 0 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,14 @@ class query_result_
};


// global mutex for all local servers
static std::mutex CHDB_MUTEX;

std::unique_ptr<query_result_> pyEntryClickHouseLocal(int argc, char ** argv)
{
try
{
std::lock_guard<std::mutex> lock(CHDB_MUTEX);
DB::LocalServer app;
app.init(argc, argv);
int ret = app.run();
Expand Down
66 changes: 58 additions & 8 deletions src/Common/PythonUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <string>
#include <Common/PythonUtils.h>
#include "config.h"

Expand All @@ -6,10 +7,10 @@
#include <cstddef>
#include <pybind11/gil.h>
#include <pybind11/pytypes.h>

#include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <pybind11/numpy.h>
#include <utf8proc.h>
#include <Columns/ColumnString.h>
#include <Common/logger_useful.h>

namespace DB
{
Expand Down Expand Up @@ -122,13 +123,40 @@ ConvertPyUnicodeToUtf8(const void * input, int kind, size_t codepoint_cnt, Colum
return offset; // Return the number of bytes written, not including the null terminator
}

int PyString_AsStringAndSize(PyObject * ob, char ** charpp, Py_ssize_t * sizep)
{
// always convert it to utf8, but this case is rare, here goes the slow path
py::gil_scoped_acquire acquire;
if (PyUnicode_Check(ob))
{
*charpp = const_cast<char *>(PyUnicode_AsUTF8AndSize(ob, sizep));
if (*charpp == nullptr)
{
return -1;
}
return 0;
}
else
{
return PyBytes_AsStringAndSize(ob, charpp, sizep);
}
}

void FillColumnString(PyObject * obj, ColumnString * column)
{
ColumnString::Offsets & offsets = column->getOffsets();
ColumnString::Chars & chars = column->getChars();
// if obj is bytes
// if (PyBytes_Check(obj))
// {
// // convert bytes to string
// column->insertData(data, bytes_size);
// }
// else
if (PyUnicode_IS_COMPACT_ASCII(obj))
{
const char * data = reinterpret_cast<const char *>(PyUnicode_1BYTE_DATA(obj));
// if obj is unicode
const char * data = reinterpret_cast<const char *>(PyUnicode_DATA(obj));
size_t unicode_len = PyUnicode_GET_LENGTH(obj);
column->insertData(data, unicode_len);
}
Expand Down Expand Up @@ -160,10 +188,10 @@ void FillColumnString(PyObject * obj, ColumnString * column)
}
else
{
// always convert it to utf8, but this case is rare, here goes the slow path
py::gil_scoped_acquire acquire;
Py_ssize_t bytes_size = -1;
const char * data = PyUnicode_AsUTF8AndSize(obj, &bytes_size);
// const char * data = PyUnicode_AsUTF8AndSize(obj, &bytes_size);
char * data = nullptr;
bytes_size = PyString_AsStringAndSize(obj, &data, &bytes_size);
if (bytes_size < 0)
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Failed to convert Python unicode object to UTF-8");
column->insertData(data, bytes_size);
Expand Down Expand Up @@ -266,7 +294,7 @@ bool _isInheritsFromPyReader(const py::handle & obj)

// Will try to get the ref of py::array from pandas Series, or PyArrow Table
// without import numpy or pyarrow. Just from class name for now.
const void * tryGetPyArray(const py::object & obj, py::handle & result, std::string & type_name, size_t & row_count)
const void * tryGetPyArray(const py::object & obj, py::handle & result, py::handle & tmp, std::string & type_name, size_t & row_count)
{
py::gil_scoped_acquire acquire;
type_name = py::str(obj.attr("__class__").attr("__name__")).cast<std::string>();
Expand All @@ -283,6 +311,28 @@ const void * tryGetPyArray(const py::object & obj, py::handle & result, std::str
// Try to get the handle of py::array from pandas Series
py::array array = obj.attr("values");
row_count = py::len(obj);
// if element type is bytes or object, we need to convert it to string
// chdb todo: need more type check
if (row_count > 0)
{
auto elem_type = obj.attr("__getitem__")(0).attr("__class__").attr("__name__").cast<std::string>();
if (elem_type == "str" || elem_type == "unicode")
{
result = array;
return array.data();
}
if (elem_type == "bytes" || elem_type == "object")
{
// chdb todo: better handle for bytes and object type
auto str_obj = obj.attr("astype")(py::dtype("str"));
array = str_obj.attr("values");
result = array;
tmp = array;
tmp.inc_ref();
return array.data();
}
}

result = array;
return array.data();
}
Expand Down
12 changes: 11 additions & 1 deletion src/Common/PythonUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ struct ColumnWrapper
void * buf; // we may modify the data when cast it to PyObject **, so we need a non-const pointer
size_t row_count;
py::handle data;
py::handle tmp; // hold some tmp data like hits['Title'].astype("str")
DataTypePtr dest_type;
std::string py_type; //py::handle type, eg. numpy.ndarray;
std::string row_format;
std::string encoding; // utf8, utf16, utf32, etc.
std::string name;

~ColumnWrapper()
{
py::gil_scoped_acquire acquire;
if (!tmp.is_none())
{
tmp.dec_ref();
}
}
};

using PyObjectVec = std::vector<py::object>;
Expand Down Expand Up @@ -194,7 +204,7 @@ inline std::vector<py::object> readData(const py::object & data_source, const st
return execWithGIL([&]() { return data_source.attr("read")(names, cursor, count).cast<std::vector<py::object>>(); });
}

const void * tryGetPyArray(const py::object & obj, py::handle & result, std::string & type_name, size_t & row_count);
const void * tryGetPyArray(const py::object & obj, py::handle & result, py::handle & tmp, std::string & type_name, size_t & row_count);

} // namespace DB
#endif
45 changes: 35 additions & 10 deletions src/Processors/Sources/PythonSource.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Processors/Sources/PythonSource.h>
#include "base/scope_guard.h"

#if USE_PYTHON
#include <cstddef>
Expand Down Expand Up @@ -147,7 +148,12 @@ ColumnPtr PythonSource::convert_and_insert(const py::object & obj, UInt32 scale)
std::string type_name;
size_t row_count;
py::handle py_array;
const void * data = tryGetPyArray(obj, py_array, type_name, row_count);
py::handle tmp;
SCOPE_EXIT({
if (!tmp.is_none())
tmp.dec_ref();
});
const void * data = tryGetPyArray(obj, py_array, tmp, type_name, row_count);
if (!py_array.is_none())
{
if constexpr (std::is_same_v<T, String>)
Expand Down Expand Up @@ -416,19 +422,38 @@ Chunk PythonSource::generate()
if (names.empty())
return {};

if (isInheritsFromPyReader(data_source))
try
{
PyObjectVecPtr data;
py::gil_scoped_acquire acquire;
data = std::move(castToSharedPtrVector<py::object>(data_source.attr("read")(names, max_block_size)));
if (data->empty())
return {};
if (isInheritsFromPyReader(data_source))
{
PyObjectVecPtr data;
py::gil_scoped_acquire acquire;
data = std::move(castToSharedPtrVector<py::object>(data_source.attr("read")(names, max_block_size)));
if (data->empty())
return {};

return std::move(genChunk(num_rows, data));
return std::move(genChunk(num_rows, data));
}
else
{
return std::move(scanDataToChunk());
}
}
else
catch (const Exception & e)
{
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Python data handling {}", e.what());
}
catch (const std::exception & e)
{
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Python data handling {}", e.what());
}
catch (const py::error_already_set & e)
{
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Python data handling {}", e.what());
}
catch (...)
{
return std::move(scanDataToChunk());
throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Python data handling unknown exception");
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ void printExceptionWithRespectToAbort(LoggerPtr log, const String & query_id)
if (ex == nullptr)
return;

tryLogCurrentException(log, "Exception while executing background task {" + query_id + "}");
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
tryLogCurrentException(log, "Exception while executing background task {" + query_id + "}");
});
// try
// {
// std::rethrow_exception(ex);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StoragePython.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void StoragePython::prepareColumnCache(const Names & names, const Columns & colu
try
{
py::object col_data = data_source[py::str(col_name)];
col.buf = const_cast<void *>(tryGetPyArray(col_data, col.data, col.py_type, col.row_count));
col.buf = const_cast<void *>(tryGetPyArray(col_data, col.data, col.tmp, col.py_type, col.row_count));
if (col.buf == nullptr)
throw Exception(
ErrorCodes::PY_EXCEPTION_OCCURED, "Convert to array failed for column {} type {}", col_name, col.py_type);
Expand Down
14 changes: 12 additions & 2 deletions tests/arrow_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import chdb
import chdb.dataframe as cdf
import chdb.session as chs
import pandas as pd
import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -37,6 +38,8 @@

sql = "SELECT COUNT(DISTINCT UserID) FROM hits;"

# sql = "SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(STRLEN(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;"

t = time.time()
# read parquet file into memory
with open(hits_0, "rb") as f:
Expand Down Expand Up @@ -65,6 +68,8 @@
# # hits[col] = hits[col].astype('string')
# hits[col] = hits[col].astype(str)

hits["Referer"] = hits["Referer"].astype(str)

# title = hits["Title"]
# title.values.data

Expand Down Expand Up @@ -216,17 +221,22 @@ def read(self, col_names, count):

reader = myReader(df_old)

sess = chs.Session()
# sess.query("set aggregation_memory_efficient_merge_threads=2;")

sql = sql.replace("STRLEN", "length")

def bench_chdb(i):
if i == 0:
format = "Debug"
else:
format = "DataFrame"
ret = chdb.query(
ret = sess.query(
# """ SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID)
# FROM Python(reader) GROUP BY RegionID ORDER BY c DESC LIMIT 10""",
# "SELECT COUNT(DISTINCT Title) FROM Python(reader);",
sql.replace("hits", "Python(hits)"),
"set aggregation_memory_efficient_merge_threads=3;"
+ sql.replace("hits", "Python(hits)"),
format,
)
return ret
Expand Down
Loading