Kafka producer failover mechanism and validation of data being pushed to topic












0















I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)



my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.



below is my producer send code.



    Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("acks", "all");
configProperties.put("retries", 0);
configProperties.put("batch.size", 15000);
configProperties.put("linger.ms", 1);
configProperties.put("buffer.memory", 30000000);
@SuppressWarnings("resource")
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
System.out.println("Starting Kafka producer job " + new Date());
producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
});


Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.



Below is what i want to address




  1. how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).


  2. how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.











share|improve this question





























    0















    I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)



    my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.



    below is my producer send code.



        Properties configProperties = new Properties();
    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    "org.apache.kafka.common.serialization.StringSerializer");
    configProperties.put("acks", "all");
    configProperties.put("retries", 0);
    configProperties.put("batch.size", 15000);
    configProperties.put("linger.ms", 1);
    configProperties.put("buffer.memory", 30000000);
    @SuppressWarnings("resource")
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
    System.out.println("Starting Kafka producer job " + new Date());
    producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
    e.printStackTrace();
    }
    }
    });


    Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.



    Below is what i want to address




    1. how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).


    2. how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.











    share|improve this question



























      0












      0








      0








      I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)



      my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.



      below is my producer send code.



          Properties configProperties = new Properties();
      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
      configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer");
      configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer");
      configProperties.put("acks", "all");
      configProperties.put("retries", 0);
      configProperties.put("batch.size", 15000);
      configProperties.put("linger.ms", 1);
      configProperties.put("buffer.memory", 30000000);
      @SuppressWarnings("resource")
      KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
      System.out.println("Starting Kafka producer job " + new Date());
      producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
      public void onCompletion(RecordMetadata metadata, Exception e) {
      if (e != null) {
      e.printStackTrace();
      }
      }
      });


      Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.



      Below is what i want to address




      1. how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).


      2. how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.











      share|improve this question
















      I have written a code to push data to kafka topic on daily basis, but there are few issue which i am not sure this code will be able to handle. my responsibility is to push complete data from a live table which holds 1 day data(refreshed every day morning)



      my code will query "select * from mytable" and push it one by one to kafka topic as before pushing i need to validate/alter each row and push to topic.



      below is my producer send code.



          Properties configProperties = new Properties();
      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, sBOOTSTRAP_SERVERS_CONFIG);
      configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer");
      configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer");
      configProperties.put("acks", "all");
      configProperties.put("retries", 0);
      configProperties.put("batch.size", 15000);
      configProperties.put("linger.ms", 1);
      configProperties.put("buffer.memory", 30000000);
      @SuppressWarnings("resource")
      KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
      System.out.println("Starting Kafka producer job " + new Date());
      producer.send(new ProducerRecord<String, String>(eventName, jsonRec.toString()), new Callback() {
      public void onCompletion(RecordMetadata metadata, Exception e) {
      if (e != null) {
      e.printStackTrace();
      }
      }
      });


      Now, i am not sure how to push data back again into topic in case of failure. Since i have selected all the records from table and few of it got failed and i do not know which all.



      Below is what i want to address




      1. how can process only those records which are not pushed to avoid duplicate record being push(avoid redundancy).


      2. how to validate the records pushed are exactly same as in table. i mean the data integrity. like size of data and count of records been pushed.








      java apache-kafka






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 23 '18 at 16:14









      cricket_007

      79.9k1142110




      79.9k1142110










      asked Nov 23 '18 at 11:01









      user1708054user1708054

      43118




      43118
























          1 Answer
          1






          active

          oldest

          votes


















          0














          You can use configProperties.put("enable.idempotence", true); - it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0 acks=all and max.in.flight.requests.per.connection >=0. For details check https://kafka.apache.org/documentation/.



          For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/






          share|improve this answer
























          • so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

            – user1708054
            Nov 23 '18 at 14:07













          • no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

            – freakman
            Nov 23 '18 at 14:29











          • thanks for the info.. i will explore more and update here...

            – user1708054
            Nov 26 '18 at 19:49











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445455%2fkafka-producer-failover-mechanism-and-validation-of-data-being-pushed-to-topic%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          You can use configProperties.put("enable.idempotence", true); - it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0 acks=all and max.in.flight.requests.per.connection >=0. For details check https://kafka.apache.org/documentation/.



          For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/






          share|improve this answer
























          • so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

            – user1708054
            Nov 23 '18 at 14:07













          • no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

            – freakman
            Nov 23 '18 at 14:29











          • thanks for the info.. i will explore more and update here...

            – user1708054
            Nov 26 '18 at 19:49
















          0














          You can use configProperties.put("enable.idempotence", true); - it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0 acks=all and max.in.flight.requests.per.connection >=0. For details check https://kafka.apache.org/documentation/.



          For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/






          share|improve this answer
























          • so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

            – user1708054
            Nov 23 '18 at 14:07













          • no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

            – freakman
            Nov 23 '18 at 14:29











          • thanks for the info.. i will explore more and update here...

            – user1708054
            Nov 26 '18 at 19:49














          0












          0








          0







          You can use configProperties.put("enable.idempotence", true); - it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0 acks=all and max.in.flight.requests.per.connection >=0. For details check https://kafka.apache.org/documentation/.



          For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/






          share|improve this answer













          You can use configProperties.put("enable.idempotence", true); - it will try to retry failed messages but make sure there will be just one of each record saved in kafka. Note that it implies that retries>0 acks=all and max.in.flight.requests.per.connection >=0. For details check https://kafka.apache.org/documentation/.



          For 2nd question - if you mean that you need to save all records or none then you have to use kafka transactions, which brings a lot more questions, I would recommend reading https://www.confluent.io/blog/transactions-apache-kafka/







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 23 '18 at 12:28









          freakmanfreakman

          3,22711536




          3,22711536













          • so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

            – user1708054
            Nov 23 '18 at 14:07













          • no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

            – freakman
            Nov 23 '18 at 14:29











          • thanks for the info.. i will explore more and update here...

            – user1708054
            Nov 26 '18 at 19:49



















          • so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

            – user1708054
            Nov 23 '18 at 14:07













          • no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

            – freakman
            Nov 23 '18 at 14:29











          • thanks for the info.. i will explore more and update here...

            – user1708054
            Nov 26 '18 at 19:49

















          so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

          – user1708054
          Nov 23 '18 at 14:07







          so for 1st point, will it query the table again and try to push the data which is not already pushed? i will go through the document and get back.. thanks for your reply :)

          – user1708054
          Nov 23 '18 at 14:07















          no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

          – freakman
          Nov 23 '18 at 14:29





          no it won' query db again. Kafka producer does not know anything about your db so if you call producer.send 2nd time anywhere for same record then you will get duplicates. retries works internally in kafka code, so it will know it should try to send same record again and again because for example it did not get acks from kafka broker.

          – freakman
          Nov 23 '18 at 14:29













          thanks for the info.. i will explore more and update here...

          – user1708054
          Nov 26 '18 at 19:49





          thanks for the info.. i will explore more and update here...

          – user1708054
          Nov 26 '18 at 19:49


















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445455%2fkafka-producer-failover-mechanism-and-validation-of-data-being-pushed-to-topic%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Berounka

          Fiat S.p.A.

          Type 'String' is not a subtype of type 'int' of 'index'