Join optimisation in case of unbalanced datasets
I have two sets to be LEFT joined:
Dataset A: ~10000 parquet files each 300 KB
Dataset B: ~50000 parquet files each 30 MB
I want to join on a string column which is common in both datasets, say "name".
One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.
The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.
apache-spark pyspark aws-glue
add a comment |
I have two sets to be LEFT joined:
Dataset A: ~10000 parquet files each 300 KB
Dataset B: ~50000 parquet files each 30 MB
I want to join on a string column which is common in both datasets, say "name".
One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.
The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.
apache-spark pyspark aws-glue
1
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05
add a comment |
I have two sets to be LEFT joined:
Dataset A: ~10000 parquet files each 300 KB
Dataset B: ~50000 parquet files each 30 MB
I want to join on a string column which is common in both datasets, say "name".
One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.
The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.
apache-spark pyspark aws-glue
I have two sets to be LEFT joined:
Dataset A: ~10000 parquet files each 300 KB
Dataset B: ~50000 parquet files each 30 MB
I want to join on a string column which is common in both datasets, say "name".
One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.
The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.
apache-spark pyspark aws-glue
apache-spark pyspark aws-glue
asked Nov 23 '18 at 14:00
Sinan ErdemSinan Erdem
583518
583518
1
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05
add a comment |
1
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05
1
1
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05
add a comment |
1 Answer
1
active
oldest
votes
If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.
df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))
df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))
Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.
Then, you read your bucketized data and you join them on "name".
spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)
Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
|
show 2 more comments
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448094%2fjoin-optimisation-in-case-of-unbalanced-datasets%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.
df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))
df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))
Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.
Then, you read your bucketized data and you join them on "name".
spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)
Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
|
show 2 more comments
If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.
df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))
df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))
Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.
Then, you read your bucketized data and you join them on "name".
spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)
Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
|
show 2 more comments
If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.
df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))
df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))
Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.
Then, you read your bucketized data and you join them on "name".
spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)
Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.
If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.
df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))
df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))
Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.
Then, you read your bucketized data and you join them on "name".
spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)
Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.
answered Nov 23 '18 at 14:19
StevenSteven
2,46811033
2,46811033
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
|
show 2 more comments
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?
– Sinan Erdem
Nov 23 '18 at 15:04
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
What About partition by? Can you use it? Any column you could use as a partition?
– Steven
Nov 23 '18 at 15:06
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.
– Sinan Erdem
Nov 23 '18 at 15:07
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.
– Steven
Nov 23 '18 at 15:09
1
1
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb
– Steven
Nov 23 '18 at 15:11
|
show 2 more comments
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448094%2fjoin-optimisation-in-case-of-unbalanced-datasets%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
have you tried bucketing ?
– Steven
Nov 23 '18 at 14:03
no, can you expand please?
– Sinan Erdem
Nov 23 '18 at 14:05