Skip to content

Commit

Permalink
Revert "unlock the GVL for compression/decompression operations"
Browse files Browse the repository at this point in the history
  • Loading branch information
SpringMT authored Mar 28, 2024
1 parent d682a90 commit 1f4a225
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 123 deletions.
46 changes: 4 additions & 42 deletions ext/zstdruby/streaming_compress.c
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#include <common.h>
#include <streaming_compress.h>
#include <ruby/thread.h>

struct streaming_compress_t {
ZSTD_CCtx* ctx;
VALUE buf;
size_t buf_size;
char nogvl;
};

static void
Expand Down Expand Up @@ -54,18 +52,11 @@ static VALUE
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
{
VALUE compression_level_value;
VALUE kwargs;
rb_scan_args(argc, argv, "01:", &compression_level_value, &kwargs);
rb_scan_args(argc, argv, "01", &compression_level_value);
int compression_level = convert_compression_level(compression_level_value);

ID kwargs_keys[1];
kwargs_keys[0] = rb_intern("no_gvl");
VALUE kwargs_values[1];
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);

struct streaming_compress_t* sc;
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
sc->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
size_t const buffOutSize = ZSTD_CStreamOutSize();

ZSTD_CCtx* ctx = ZSTD_createCCtx();
Expand All @@ -85,35 +76,6 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
: (FIX2INT((val))))
#define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue)

struct compress_stream_nogvl_t {
ZSTD_CCtx* ctx;
ZSTD_outBuffer* output;
ZSTD_inBuffer* input;
ZSTD_EndDirective endOp;
size_t ret;
};

static void*
compressStream2_nogvl(void* arg)
{
struct compress_stream_nogvl_t* params = arg;
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
return NULL;
}

static size_t
compressStream2(char nogvl, ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
{
struct compress_stream_nogvl_t params = { ctx, output, input, endOp, 0 };
if (nogvl) {
rb_thread_call_without_gvl(compressStream2_nogvl, &params, NULL, NULL);
}
else {
compressStream2_nogvl(&params);
}
return params.ret;
}

static VALUE
no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
{
Expand All @@ -124,7 +86,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
do {
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };

size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, endOp);
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
}
Expand All @@ -147,7 +109,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
VALUE result = rb_str_new(0, 0);
while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
}
Expand All @@ -170,7 +132,7 @@ rb_streaming_compress_addstr(VALUE obj, VALUE src)

while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
size_t const result = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
if (ZSTD_isError(result)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
}
Expand Down
47 changes: 4 additions & 43 deletions ext/zstdruby/streaming_decompress.c
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include <common.h>
#include <ruby/thread.h>

struct streaming_decompress_t {
ZSTD_DCtx* ctx;
VALUE buf;
size_t buf_size;
char nogvl;
};

static void
Expand Down Expand Up @@ -50,19 +48,10 @@ rb_streaming_decompress_allocate(VALUE klass)
}

static VALUE
rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj)
rb_streaming_decompress_initialize(VALUE obj)
{
VALUE kwargs;
rb_scan_args(argc, argv, "00:", &kwargs);

ID kwargs_keys[1];
kwargs_keys[0] = rb_intern("no_gvl");
VALUE kwargs_values[1];
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);

struct streaming_decompress_t* sd;
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
sd->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
size_t const buffOutSize = ZSTD_DStreamOutSize();

ZSTD_DCtx* ctx = ZSTD_createDCtx();
Expand All @@ -76,34 +65,6 @@ rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj)
return obj;
}

struct decompress_stream_nogvl_t {
ZSTD_DCtx* ctx;
ZSTD_outBuffer* output;
ZSTD_inBuffer* input;
size_t ret;
};

static void*
decompressStream_nogvl(void* args)
{
struct decompress_stream_nogvl_t* params = args;
params->ret = ZSTD_decompressStream(params->ctx, params->output, params->input);
return NULL;
}

static size_t
decompressStream(char nogvl, ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
{
struct decompress_stream_nogvl_t params = { ctx, output, input, 0 };
if (nogvl) {
rb_thread_call_without_gvl(decompressStream_nogvl, &params, NULL, NULL);
}
else {
decompressStream_nogvl(&params);
}
return params.ret;
}

static VALUE
rb_streaming_decompress_decompress(VALUE obj, VALUE src)
{
Expand All @@ -118,7 +79,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
VALUE result = rb_str_new(0, 0);
while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const ret = decompressStream(sd->nogvl, sd->ctx, &output, &input);
size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input);
if (ZSTD_isError(ret)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
}
Expand All @@ -141,7 +102,7 @@ rb_streaming_decompress_addstr(VALUE obj, VALUE src)

while (input.pos < input.size) {
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
size_t const result = decompressStream(sd->nogvl, sd->ctx, &output, &input);
size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input);
if (ZSTD_isError(result)) {
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
}
Expand All @@ -155,7 +116,7 @@ zstd_ruby_streaming_decompress_init(void)
{
VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject);
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, -1);
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0);
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1);
}
Expand Down
30 changes: 9 additions & 21 deletions spec/zstd-ruby-streaming-compress_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
require "spec_helper"
require 'zstd-ruby'

shared_examples "a streaming compressor" do
RSpec.describe Zstd::StreamingCompress do
describe '<<' do
it 'shoud work' do
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingCompress.new
stream << "abc" << "def"
res = stream.finish
expect(Zstd.decompress(res)).to eq('abcdef')
Expand All @@ -13,7 +13,7 @@

describe '<< + GC.compat' do
it 'shoud work' do
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingCompress.new
stream << "abc" << "def"
GC.compact
stream << "ghi"
Expand All @@ -24,7 +24,7 @@

describe '<< + flush' do
it 'shoud work' do
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingCompress.new
stream << "abc" << "def"
res = stream.flush
stream << "ghi"
Expand All @@ -35,7 +35,7 @@

describe 'compress + flush' do
it 'shoud work' do
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingCompress.new
res = stream.compress("abc")
res << stream.flush
res << stream.compress("def")
Expand All @@ -46,7 +46,7 @@

describe 'compression level' do
it 'shoud work' do
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
stream = Zstd::StreamingCompress.new(5)
stream << "abc" << "def"
res = stream.finish
expect(Zstd.decompress(res)).to eq('abcdef')
Expand All @@ -56,26 +56,14 @@
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
describe 'Ractor' do
it 'should be supported' do
r = Ractor.new(no_gvl) do |no_gvl|
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
r = Ractor.new {
stream = Zstd::StreamingCompress.new(5)
stream << "abc" << "def"
res = stream.finish
end
}
expect(Zstd.decompress(r.take)).to eq('abcdef')
end
end
end
end

RSpec.describe Zstd::StreamingCompress do
describe "with the global lock" do
let(:no_gvl) { false }
it_behaves_like "a streaming compressor"
end

describe "without the global lock" do
let(:no_gvl) { true }
it_behaves_like "a streaming compressor"
end
end

22 changes: 5 additions & 17 deletions spec/zstd-ruby-streaming-decompress_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
require 'zstd-ruby'
require 'securerandom'

shared_examples "a streaming decompressor" do
RSpec.describe Zstd::StreamingDecompress do
describe 'streaming decompress' do
it 'shoud work' do
# str = SecureRandom.hex(150)
Expand All @@ -22,7 +22,7 @@
# str = SecureRandom.hex(150)
str = "foo bar buzz" * 100
cstr = Zstd.compress(str)
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingDecompress.new
result = ''
result << stream.decompress(cstr[0, 5])
result << stream.decompress(cstr[5, 5])
Expand All @@ -35,30 +35,18 @@
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
describe 'Ractor' do
it 'should be supported' do
r = Ractor.new(no_gvl) do |no_gvl|
r = Ractor.new {
cstr = Zstd.compress('foo bar buzz')
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
stream = Zstd::StreamingDecompress.new
result = ''
result << stream.decompress(cstr[0, 5])
result << stream.decompress(cstr[5, 5])
result << stream.decompress(cstr[10..-1])
result
end
}
expect(r.take).to eq('foo bar buzz')
end
end
end
end

RSpec.describe Zstd::StreamingDecompress do
describe "with the gvl" do
let(:no_gvl) { false }
it_behaves_like "a streaming decompressor"
end

describe "without the gvl" do
let(:no_gvl) { true }
it_behaves_like "a streaming decompressor"
end
end

0 comments on commit 1f4a225

Please sign in to comment.