Friday, July 31, 2020

Java random notes for quick reference


Lambda & CheckedException/RuntimeException
If you try to use methods that throw any checked exception in a Lamda expression - it will complain - one solution is to change to /use custom RuntimeException instead of checked exception

SimpleDateFormat is not thread-safe: WHY? - Alternatives?

SDF keeps intermediate results in instance fields (Calendar) - so if one instance is used by 2 threads, can mess it.

Sample:
DateTimeFormatValidator{
private SimpleDateFormat sdf = null; //NOT thread safe

          private DateTimeFormatter formatter = null;//thread safe

private org.apache.commons.lang3.FastDateFormat fdf = null;//thread safe

public void inititialize(String formatStr){ 

    //sdf = new SimpleDateFormat(formatStr);

             formatter = DateTimeFormatter.ofPattern(formatStr);

public boolean isValidFormat(Object dateStr){
    sdf.format(dateStr);// not thread safe

    formatter.parse(dateStr); //thread safe

}
}


Java WebApps - UnsatisfiedLinkError - native library


As of Tomcat versions 9.0.138.5.35, and 7.0.92 we have added the following options to address this issue BZ-62830:

1) Use the JniLifecycleListener to load the native library.

e.g. to load the opencv_java343 library, you can use:

<Listener className="org.apache.catalina.core.JniLifecycleListener"
          libraryName="opencv_java343" />

2) Use the load() or loadLibrary() from org.apache.tomcat.jni.Library instead of System.

e.g.

org.apache.tomcat.jni.Library.loadLibrary("opencv_java343");

Using either of those options will use the Common ClassLoader to load the native library, and therefore it will be available to all of the Web Apps.

Java 8 - Streams


Java Process - Fire and Forget


Process p = Runtime.getRuntime().exec( "command" );
p.waitFor(); // current thread will wait until the process "p" is completed/interrupted.



Threads - the old way - t1.start() , t1.join();

/**
 * Call 2 expensive methods on separate threads 
 *    
 * @throws InterruptedException 
 */
public void doMultiThreadedJob() throws InterruptedException {
  /* create Runnable using anonymous inner class */
  Thread t1 = new Thread(new Runnable() { 
    public void run() {
      System.out.println("starting expensive task thread t1");
      doSomethingExpensive(); 
      System.out.println("finished expensive task thread t1");
    }
  });
  /* start processing on new threads */
  t1.start();
  /* block current thread until t1 has finished */
  t1.join();
}

Dealing with threads directly can be cumbersome, so Oracle simplified things by providing a layer of abstraction via its Executor API. An Executor allows you to process tasks asynchronously without having to deal with threads directly.

The Executors factory class is used to create an instance of an Executor, either an ExecutorService or an ScheduledExecutorServiceSome of the most common types of Executor are described below.
  • Executors.newCachedThreadPool() — An ExecutorService with a thread pool that creates new threads as required but reuses previously created threads as they become available.
  • Executors.newFixedThreadPool(int numThreads) — An ExecutorServicethat has a thread pool with a fixed number of threads. The numThreads parameter is the maximum number of threads that can be active in the ExecutorService at any one time. If the number of requests submitted to the pool exceeds the pool size, requests are queued until a thread becomes available.
  • Executors.newScheduledThreadPool(int numThreads) — A ScheduledExecutorServicewith a thread pool that is used to run tasks periodically or after a specified delay.
  • Executors.newSingleThreadExecutor() — An ExecutorService with a single thread. Tasks submitted will be executed one at a time and in the order submitted.
  • Executors.newSingleThreadScheduledExecutor() — An ExecutorService that uses a single thread to execute tasks periodically or after a specified delay.

execute(Runnable)

The execute method takes a Runnable and is useful when you want to run a task and are not concerned about checking its status or obtaining a result. Think of it as fire and forget asynchronous task.
executorService.execute(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

Future<?> submit(Runnable)

Like execute(), the submit() method also takes a Runnable but differs from execute()because it returns a Future. A Future is an object that represents the pending response from an asynchronous task. Think of it as a handle that can be used to check the status of the task or retrieve its result when the task completes. Futures use generics to allow you to specify the return type of the task. However, given that the Runnablerun() method has the return type void, the Future holds the status of the task rather than a pending result. This is represented as Future<?> in the example below.
Future<?> taskStatus = executorService.submit(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  doSomethingExpensive();    
}

Thesubmit(Runnable)method is useful when you want to run a task that doesn't return a value but you'd like to check the status of the task after it's been submitted to the ExecutorService.

Checking the Status of a Task

Future has a few useful methods for checking the status of a task that's been submitted to the ExecutorService.
  • isCancelled() checks if the submitted task has already been canceled.
  • isDone() checks if the submitted task has already completed. When a task has finished, isDone will return true whether the task completed successfully, unsuccessfully, or was canceled.
  • cancel() cancels the submitted task. A boolean parameter specifies whether or not the task should be interrupted if it has already started.
/* check if both tasks have completed - if not sleep current thread 
 * for 1 second and check again
 */
while(!task1Future.isDone() || !task2Future.isDone()){
  System.out.println("Task 1 and Task 2 are not yet complete....sleeping");
  Thread.sleep(1000);
}
Future<T> submit(Callable)
The submitmethod is overloaded to take a Callable as well as a Runnable. Like a Runnable, a Callable represents a task that is executed on another thread. A Callable differs from a Runable because it returns a value and can throw a checked Exception. The Callable interface has a single abstract method public T call() throws Exception and like Runable can be implemented with an anonymous inner class or lambda. The return type of the call() method is used to type the Future returned by the ExecutorService. Two code snippets below show how a Callable can be created via an anonymous inner class and a lambda expression.
Future<Double> task1Future = executorService.submit(new Callable<Double>() {
  public Double call() throws Exception {
    System.out.println(String.format("starting expensive task thread %s", 
        Thread.currentThread().getName()));
    Double returnedValue = someExpensiveRemoteCall();
    return returnedValue;
  } 
});
Future<Double> task2Future = executorService.submit(()->{
  System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
  Double returnedValue = someExpensiveRemoteCall();
  return returnedValue;
});

Both examples create a Callable and pass it to the execute method. The Callable is executed as soon as a thread is available.

Getting a Result from a Future

When a Callable is submitted to the ExecutorService, we receive a Future with the return type of the call() method. In the example above, call() returns a Double so we get a Future<Double>. One way of retrieving the result from a Future is by calling its get() method. get() will block indefinitely waiting on the submitted task to complete. If the task doesn't complete or takes a long time to complete, the main application thread will remain blocked.
Waiting indefinitely for a result is usually not ideal. We'd rather have more control over how we retrieve the result and take some action if a task doesn't complete within a certain amount of time. Luckily there's an overloaded get(long timeout, TimeUnit unit) method that waits for the specified period of time and if the task hasn't finished (result not available), throws a TimeoutException.
Double value1 = task1Future.get();
Double value2 = task2Future.get(4,  TimeUnit.SECONDS); // throws TimeoutException

Submitting Multiple Callables


Builder Pattern

The builder pattern is a good choice when designing classes whose constructors or static factories would have more than a handful of parameters.


Private constructor, builder class , with setter returning builder obj, helps to chain the setters














Tuesday, July 28, 2020

My AWS experiments n notes





AWS Login issue - rare case - worth seeing :)

502 ERROR
The request could not be satisfied.


The origin closed the connection. We can't connect to the server for this app or website at this time. There might be too much traffic or a configuration error. Try again later, or contact the app or website owner.
If you provide content to customers through CloudFront, you can find steps to troubleshoot and help prevent this error by reviewing the CloudFront documentation.

Generated by cloudfront (CloudFront)
Request ID: 7ZVY4aMiDfJQZ_jKVREL72qokqXwV0TVxyjgiFhzteD2yyOe5QL5MQ==


My Route53 migration (Domain is still under the other AWS account)

There were multiple hurdles

New certificate procured for the domain to attach with the CloudFront Distribution

Validation did not work even after multiple retries using "add recordset to the Route53 hostedZone" option. It was being added and says successful, but goes away and validation was not completing
Later, I had to recreate and validate using email authentication

This issue might be because I created new hosted zone for the domain in this new account, however, I did not update the name servers @ DNS entry to match this new hosted zone (I had deleted the old hosted zone few days before fully setting up the new one)

I noticed below AWS page very late


Site note reachable - Error: DNS_PROBE_FINISHED_NXDOMAIN

This is found to be a browser issue as per below link
https://kinsta.com/knowledgebase/dns_probe_finished_nxdomain/

AuthorizedHeaderMalformed from CloudFront

Though I got this error, when I tried from a different laptop (VPN), and at the same time, (for so many hours) my personal laptop was showing me that the site is not available (forgot the exact message) 
However, all on a sudden it started working for me, which was a surprise for me, as I did not expect this long time for the changes to get reflected

Now I assume the reason for the long delay being my bad config and then default TTL for the NS record (was 172800 sec - 48 hours) - Now I made it to just 1 minute (but should have been cached @)




 

Sunday, July 19, 2020

Learning Links



Window Functions (Hive)

https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_analytic_functions.html
Window Functions (Spark DataFrame)
http://xinhstechblog.blogspot.com/2016/04/spark-window-functions-for-dataframes.html

Java 8 - loops


InputStream.range - call innerClass - effectively final contraint is no more
https://developer.ibm.com/technologies/java/articles/j-java8idioms3/ 
Mutables vs Parameters : The variable i, which we defined in our for loop, is a single variable that is mutated through each iteration of the loop. The variable i in the range example is a parameter to the lambda expression, so it’s a brand new variable in each iteration.  

Tuesday, July 14, 2020

MapRDB HBase notes (copyTable etc)


copyTableMapR-DB table
https://mapr.com/docs/52/ReferenceGuide/CopyTable.html
hbase com.mapr.fs.hbase.tools.mapreduce.CopyTable
    -src <source table path> -dst <destination table path>
    [-columns cf1[:col1],...] [-maxversions <max number of versions to copy>]
    [-starttime <time>]
    [-endtime <time>]
    [-mapreduce <true|false> (default: true)]
    [-bulkload <true|false> (default: true)]
    [-numthreads <numThreads> (default:16, valid only when -mapreduce is false)]

copyTableMapR-DB Json table
https://mapr.com/docs/60/ReferenceGuide/mapr_copytable.html
Copies data from one MapR-DB JSON table to another MapR-DB JSON table.
mapr copytable 
-src <source table path>
-dst <destination table path>
[-fromID <start key>]
[-toID <end key>]
[-bulkload <true|false> (default: true)]
[-mapreduce <true|false> (default: true)]
[-cmpmeta <true|false> (default: true)]
[-numthreads <number of threads> (default: 16)

truncate : MapR-DB table
truncate command does the 'disable, drop and create' for you

Sunday, July 5, 2020

Unix / Shell Script - Notes


* GREP - Print characters before and after a pattern match
    grep -oP '.{0,5}searchStr.{0,10}'

* GREP all characters after the match
   grep -oP 'searchStr.*'

GREP all characters before the match

   grep -oP '.*searchStr'

* Reverse/Inverse grep: grep -v  to get all non-matching lines

Monday, June 29, 2020

Kafka Consumer - types - atleast-once , exactly-once



·          At-most-once Kafka Consumer (Zero or More Deliveries)

1)                     enable.auto.commit’ to true

2)                     small value for auto.commit.interval.ms

3)                     don’t call  consumer.commitSync()

·          At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)

1)                     (enable.auto.commit to true with high value for auto.commit.interval.ms

2)                     OR enable.auto.commit false)

3)                     AND don’t call  consumer.commitSync()

implement ‘idempotent’ behavior within consumer to avoid reprocessing of the duplicate messages

·          Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)

1)                     enable.auto.commit = false.

2)                     Don't call consumer.commitSync(); after processing the message.

3)                     Subscribe with listener - in ConsumerRebalanceListener perform consumer.seek(topicPartition,offset); to start reading

4)                     Store the processed message's offset (RDBMS) as a single atomic-transaction. If NoSQL - Store the offset along with the message.

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
         offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
                }
        }
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
          consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
                }
        }

·          Exactly-once Kafka Static Consumer via Assign (One and Only One Message Delivery) - consumer registers with Kafka via a ‘assign (2) registration method call.

1)                     enable.auto.commit’ to false

2)                     don’t call  consumer.commitSync()

3)                     Register consumer to specific partition using ‘assign’ call.

4)                     On start up of the consumer seek to specific message offset by calling consumer.seek(topicPartition,offset);

5)                     Store offset in an atomic way (RDBMS or NoSQL)

6)                     Implement idempotent as an extra safety

TopicPartition topicPartition =
 registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
 long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition.
 consumer.seek(topicPartition, offset);
 processRecords(consumer);