-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merge streamed individual rows from source striped tables #1
base: master
Are you sure you want to change the base?
Conversation
…max internal chunk size to stop the stream consumer from being flooded with huge objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think getting rid of the chunks is a very good idea. When there are lots of files being merged, I think the minimum maximum each time a new block was added was probably yielding only a few rows per iteration, and we just ended up slicing the vectors hundreds of times.
So I like the approach.
I'll have to cherry pick the changes onto my icicle-lang repo and go through it properly to review the code well.
Probably a bit too much going to and from lists, streams, and vectors, and cons vectors; most of it the compiler should elide, but it's hard to know.
fromJust $ Cons.index 0 kvss | ||
|
||
2 -> do | ||
let mergeS s1 s2 = remapStreamEnd $ Stream.merge s1 s2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The remap can just be void
can't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also on this line, should it be mergeBy rowKey
?
|
||
Stream.mapMaybeM (hoistEither . mergeRows msize schema) $ | ||
Stream.filter (/= []) $ | ||
Stream.mapped Stream.toList $ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a toList from then Boxed.fromList almost immediately. You might be able to just map the stream with rowValues and use FoldL.impurely S.foldM FoldL.vectorM
to build the boxed vector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if list stream fusion will work through the call (though it might?)
Sorry I only noticed this recently. |
Stream.mapMaybeM (hoistEither . mergeRows msize schema) $ | ||
Stream.filter (/= []) $ | ||
Stream.mapped Stream.toList $ | ||
Stream.groupBy compKey $ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what's happening here is that you're merging the streams using a binary tree, such that the final stream is ordered on the row key, then using groupBy
to get all the rows which contain the key as a stream or list of vector, then re-turning it into a binary tree using the function mergeValues
.
So I think this is a bit round about. It should be possible to merge the streams themselves in a binary fashion (and it would make doing it in parallel easier I believe).
It's cool. I've left a few comments. |
unionInputGroupBy schema msize inputs0 = do | ||
let | ||
compKey :: Row -> Row -> Bool | ||
compKey a b = (rowKey a) == (rowKey b) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is
(==) `on` rowKey
by the way. With on
from Data.Function.
Two changes that I had started a while ago and just around to finishing off. It be would be good to get some feedback on.
Stream each source striped table as individual logical rows.
I think it greatly simplifies the merging logic and seems to create less overhead than doing the merges as blocks of entities.
This was also a precursor to various attempts to run stages of it concurrently as the merge command is single threaded. But also an attempt to use memory more efficiently.
Used splitting the source striped table when necessary to stop the memory being swamped with huge objects (large logical table) when the source file wasn't properly chunked.
This was the case for the output of the scatter job.
This simple change alone speed up the merge task a fair amount with my dummy test data (by reducing GC time by roughly 20%).