Join optimisation in case of unbalanced datasets












0















I have two sets to be LEFT joined:



Dataset A: ~10000 parquet files each 300 KB



Dataset B: ~50000 parquet files each 30 MB



I want to join on a string column which is common in both datasets, say "name".



One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.



The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.










share|improve this question


















  • 1





    have you tried bucketing ?

    – Steven
    Nov 23 '18 at 14:03











  • no, can you expand please?

    – Sinan Erdem
    Nov 23 '18 at 14:05
















0















I have two sets to be LEFT joined:



Dataset A: ~10000 parquet files each 300 KB



Dataset B: ~50000 parquet files each 30 MB



I want to join on a string column which is common in both datasets, say "name".



One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.



The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.










share|improve this question


















  • 1





    have you tried bucketing ?

    – Steven
    Nov 23 '18 at 14:03











  • no, can you expand please?

    – Sinan Erdem
    Nov 23 '18 at 14:05














0












0








0








I have two sets to be LEFT joined:



Dataset A: ~10000 parquet files each 300 KB



Dataset B: ~50000 parquet files each 30 MB



I want to join on a string column which is common in both datasets, say "name".



One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.



The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.










share|improve this question














I have two sets to be LEFT joined:



Dataset A: ~10000 parquet files each 300 KB



Dataset B: ~50000 parquet files each 30 MB



I want to join on a string column which is common in both datasets, say "name".



One important thing is each row in Dataset A has a match in Dataset B. But Dataset B contains many other rows.



The usual join function takes very long and fails on most cases. So I am asking if there can be an optimisation? For example, is partitioning Dataset B alphabetically on "name" column a good idea? Broadcast join will not work because Dataset A is not small enough.







apache-spark pyspark aws-glue






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 23 '18 at 14:00









Sinan ErdemSinan Erdem

583518




583518








  • 1





    have you tried bucketing ?

    – Steven
    Nov 23 '18 at 14:03











  • no, can you expand please?

    – Sinan Erdem
    Nov 23 '18 at 14:05














  • 1





    have you tried bucketing ?

    – Steven
    Nov 23 '18 at 14:03











  • no, can you expand please?

    – Sinan Erdem
    Nov 23 '18 at 14:05








1




1





have you tried bucketing ?

– Steven
Nov 23 '18 at 14:03





have you tried bucketing ?

– Steven
Nov 23 '18 at 14:03













no, can you expand please?

– Sinan Erdem
Nov 23 '18 at 14:05





no, can you expand please?

– Sinan Erdem
Nov 23 '18 at 14:05












1 Answer
1






active

oldest

votes


















2














If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.



df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))


Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.



Then, you read your bucketized data and you join them on "name".



spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)


Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.






share|improve this answer
























  • Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

    – Sinan Erdem
    Nov 23 '18 at 15:04











  • What About partition by? Can you use it? Any column you could use as a partition?

    – Steven
    Nov 23 '18 at 15:06











  • Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

    – Sinan Erdem
    Nov 23 '18 at 15:07











  • that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

    – Steven
    Nov 23 '18 at 15:09






  • 1





    Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

    – Steven
    Nov 23 '18 at 15:11













Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448094%2fjoin-optimisation-in-case-of-unbalanced-datasets%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.



df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))


Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.



Then, you read your bucketized data and you join them on "name".



spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)


Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.






share|improve this answer
























  • Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

    – Sinan Erdem
    Nov 23 '18 at 15:04











  • What About partition by? Can you use it? Any column you could use as a partition?

    – Steven
    Nov 23 '18 at 15:06











  • Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

    – Sinan Erdem
    Nov 23 '18 at 15:07











  • that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

    – Steven
    Nov 23 '18 at 15:09






  • 1





    Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

    – Steven
    Nov 23 '18 at 15:11


















2














If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.



df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))


Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.



Then, you read your bucketized data and you join them on "name".



spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)


Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.






share|improve this answer
























  • Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

    – Sinan Erdem
    Nov 23 '18 at 15:04











  • What About partition by? Can you use it? Any column you could use as a partition?

    – Steven
    Nov 23 '18 at 15:06











  • Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

    – Sinan Erdem
    Nov 23 '18 at 15:07











  • that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

    – Steven
    Nov 23 '18 at 15:09






  • 1





    Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

    – Steven
    Nov 23 '18 at 15:11
















2












2








2







If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.



df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))


Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.



Then, you read your bucketized data and you join them on "name".



spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)


Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.






share|improve this answer













If you can bucketize your files before joining, it is probably better.
Otherwise, you need one more writting step to use bucketing.



df_A.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_A'))

df_B.write.format('parquet')
... .bucketBy(10, 'name')
... .mode("overwrite")
... .saveAsTable('bucketed_table_B'))


Bucketing allows you to pre-shuffle your data.
Both dataframa_A and datafram_B should have the same number of buckets. The choice of number of bucket is a difficult "art" and depends on your data and your configuration.



Then, you read your bucketized data and you join them on "name".



spark.table('bucketed_table_A').join(
spark.table('bucketed_table_B'),
on='name',
how='left'
)


Doing that, you transfer the computing time from join step to write/bucketize step. But do it once, and then you can re-use it many times.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 23 '18 at 14:19









StevenSteven

2,46811033




2,46811033













  • Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

    – Sinan Erdem
    Nov 23 '18 at 15:04











  • What About partition by? Can you use it? Any column you could use as a partition?

    – Steven
    Nov 23 '18 at 15:06











  • Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

    – Sinan Erdem
    Nov 23 '18 at 15:07











  • that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

    – Steven
    Nov 23 '18 at 15:09






  • 1





    Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

    – Steven
    Nov 23 '18 at 15:11





















  • Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

    – Sinan Erdem
    Nov 23 '18 at 15:04











  • What About partition by? Can you use it? Any column you could use as a partition?

    – Steven
    Nov 23 '18 at 15:06











  • Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

    – Sinan Erdem
    Nov 23 '18 at 15:07











  • that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

    – Steven
    Nov 23 '18 at 15:09






  • 1





    Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

    – Steven
    Nov 23 '18 at 15:11



















Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

– Sinan Erdem
Nov 23 '18 at 15:04





Thank you for the suggestion. I am using AWS Glue, the Spark version is lower than 2.3 so it doesnt support BucketBy. Do you know any alternative to this?

– Sinan Erdem
Nov 23 '18 at 15:04













What About partition by? Can you use it? Any column you could use as a partition?

– Steven
Nov 23 '18 at 15:06





What About partition by? Can you use it? Any column you could use as a partition?

– Steven
Nov 23 '18 at 15:06













Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

– Sinan Erdem
Nov 23 '18 at 15:07





Is the effect same as BucketBy for the join? I can create a column with the starting letter of the "name" column.

– Sinan Erdem
Nov 23 '18 at 15:07













that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

– Steven
Nov 23 '18 at 15:09





that's potentially a good idea I think. The effect is not the same but it can produce some unexpected results probably.

– Steven
Nov 23 '18 at 15:09




1




1





Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

– Steven
Nov 23 '18 at 15:11







Another improvement is to "compact" dataframes A and B. 300kb per file is too few. Reduce the number of files, increase the size. Ideal size is around 200-300mb

– Steven
Nov 23 '18 at 15:11




















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53448094%2fjoin-optimisation-in-case-of-unbalanced-datasets%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Berounka

Different font size/position of beamer's navigation symbols template's content depending on regular/plain...

Sphinx de Gizeh