Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caching in non-library mode throws exception and question regarding performance tests #135

Open
ChrisWint opened this issue May 19, 2020 · 0 comments

Comments

@ChrisWint
Copy link

I was playing around with the PerformanceTest example and wanted to try the multithreaded non-library mode. If I understood the config correctly this is achieved by setting the OwnedThreads stream scheduler.
However, if I do this a simple GroupAggregate will cause the Cache()call to throw in StreamableIO.cs:179 with a NotImplementedException.
I included the full code to reproduce the issue below including a dump of the config. Is there something I am missing that I need to do in order to use Cache() in multithreaded mode or is this a bug?

I have tried multiple configuration combinations for the OwnedThreads scheduler but all result in the same error, same with using intervals instead of StartEdge events. I encountered the same issues for equi-joins as well, but the grouping example is simpler.

If caching is not inteded for multithreaded modes is there a way to achieve similar, efficient in-memory caching of the results that I could use to measuer the performance of my queries?
And are there other settings in the config I should fine-tune to achieve the best throughput for multi-threaded execution?

Thanks!

public static void Main(string[] args)
        {
            Config.StreamScheduler = StreamScheduler.OwnedThreads(2);

            var rand = new Random();
            var dataset =
              Observable.Range(0, 100000)
                  .Select(e => StreamEvent.CreateStart(1, new { field1 = e % 1000, field2 = rand.Next(1, 100) }))
                  .ToStreamable()
                  .SetProperty().IsConstantDuration(true, StreamEvent.InfinitySyncTime)
                  .Cache();

            var query = dataset.Where(d => d.field1 < 950).GroupAggregate(
                d => d.field1,
                d => d.Sum(d => d.field2),
                d => d.Count(),
                (key, sum, count) => new { key.Key, sum, count });

            // Run the query
            var sw = new Stopwatch();
            GC.Collect();
            GC.WaitForPendingFinalizers();
            sw.Start();

            var result = query.Cache();

            GC.Collect();
            GC.WaitForPendingFinalizers();
            sw.Stop();

            var count = 0ul;
            result.Count().ToStreamEventObservable().ForEachAsync(r => count += r.Payload).Wait();
            Console.WriteLine("Query Took {0} ms on average, had {1} rows", sw.ElapsedMilliseconds / repeatCount, count);

Config:

{ MapArity = 2,
 ReduceArity = 2,
 ForceRowBasedExecution = False,
 DeterministicWithinTimestamp = True,
 ClearColumnsOnReturn = False,
 DisableMemoryPooling = False,
 DataBatchSize = 80000,
 UseMultiString = False,
 IngressSortingTechnique = ImpatienceSort,
 MultiStringTransforms = None,
 Scheduler = OwnedThreadsScheduler(Map:2, Reduce:2),
 GeneratedCodePath = Generated,
 GenerateDebugInfo = True,
 BreakIntoCodeGen = None,
 DontFallBackToRowBasedExecution = False,
 SuperStrictColumnar = False,
 CodeGenAfa = True }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant