Tuesday, December 21, 2021

MessageDigest & threadSafety: duplicate hash values on large concurrent processing (Spark-UDF)

Issue: MessageDigest was used as a singleton for this UDF exposed function. As MessageDigest is not threadSafe, it caused duplicate hash values when a large volume of concurrent data was processed.

Solution: So changed the logic to initialize the object for each call.

So see below the old and new approaches highlighted below (obsolete code can be removed) 



Further reference:

how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/

need-thread-safe-messagedigest-in-java

Sunday, May 2, 2021

MySQL - implement Windows Function like logic in MySQL 5.6 version ( windows fn is there fro 8.0)

 MySQL - WINDOWS FUNCTION Implementation without Windows function 

#this is needed in MySQL version below version 8.0 

Scenario:

There are records with seemingly PrimaryKey column (cant make it PK, and so duplicate can come in) , a ForeignKey and createTImestamp --> want to make sure that processing happens only once for the primaryKey  - if there are duplicate entries - only way to find it is from  the order of create timestamp and if processed already, foreignKey column will have value.

So want to ientify is there was a prior ForeignKey assigned - which means -   its already processed --- AND  if at all there are 2 entries which are not processed yet, make sure to process just the first one and mark other one as cancelled



SELECT id,
     crew_id,
     amount,
     CASE type 
         WHEN @curType THEN @curRow := @curRow + 1 
         ELSE @curRow := 1
     END AS rank,
     @curType := type AS type
FROM Table1 p
JOIN (SELECT @curRow := 0, @curType := '') r
ORDER BY crew_id, type


### Getting the rank based on execId & createTimestamp

SELECT foreignId, createTimestamp,
		CASE executionId 
			WHEN @curType THEN @curRow := @curRow + 1 
			ELSE @curRow := 1 
		END AS rank,
        @curType := executionId AS executionId
FROM  job_queue_prd p
JOIN (SELECT @curRow := 0, @curType := '') r
WHERE p.executionID in (204626, 204851) 
ORDER BY  executionId, createTimestamp asc;

### Getting current & prior instanceId & the rank based on execId & createTimestamp

SET @instanceId=0; SELECT foreignId, createTimestamp, @instanceId prior_foreignId, @foreignId:=foreignId foreignId,
CASE executionId WHEN @curExecId THEN @curRow := @curRow + 1 ELSE @curRow := 1 END AS rank, @curExecId := executionId AS executionId FROM job_queue_prd p JOIN (SELECT @curRow := 0, @curExecId := '') r WHERE p.executionID in (204626, 204851) ORDER BY executionId, createTimestamp asc;

Saturday, April 24, 2021

GIT Repo , Repo Management Systems - BitBucket vs GitHub

 


Git is a distributed version control system.


Stash is a repository management system of Atlassian - in 2015 its renamed to BitBucket - So, its a management tool for Git


There are many repository management systems you can use with Git. 


One of the most popular is Stash in Enterprise world. 


But when you look to open source world, GitHub is the most popular one as far as I know.


Ref: https://stackoverflow.com/questions/32294534/what-is-the-relationship-between-git-and-stash


Tuesday, March 30, 2021

Scala Notes

 

Builder Pattern (Scala, Java) SparkSession uses builder patterns

https://www.freecodecamp.org/news/how-to-solve-the-builder-patterns-boilerplate-problem-2ea97001dbe6/ 

The builder pattern is great for classes that can have complex initialization. It typically consists of setting values for multiple variables, some of which may be required with the rest being optional. 
…in which you provide a constructor with only the required parameters, another with a single optional parameter, a third with two optional parameters, and so on, culminating in a constructor with all the optional parameters.
Actually writing a builder can be quite a tedious and repetitive task.
Doing it dynamically - using Lombok library - which does dynamic code generation

@Builder annotation - if applied on a class (POJO) will create its Builder class automatically

Support @Builder on Person class will create PersonBuilder class ad you can set it as below

Person.builder().firstName("Adam").lastName("Savage") .city("San Francisco").jobTitle("TV Personality").build(); 


Tail-Recursion (import scala.annotation.tailrec)

All recursive cases are not tail-recursive (factorial is not, becuase it has to

keep intermediate result - but GCD is - because it does not have to memorize)

https://www.geeksforgeeks.org/tail-recursion-in-scala/

https://www.scala-exercises.org/scala_tutorial/tail_recursion

Example: GCD aka GCF https://www.khanacademy.org/math/cc-sixth-grade-math/cc-6th-factors-and-multiples/cc-6th-gcf/v/greatest-common-divisor

 Monad


Closures

Trait

List manipulation

var myList = List.empty[String]

 myList :+= "sree"

myList

res2: List[String] = List(sree)

Immutable by design - helps with concurrent programming

Option - 

Yield -  produces a value for each iteration

Null, Nil, None, Nothing

Null - absense of value

Nil - end of list

 None - value of an Option which has no value in it

Nothing - lowest type in the entire type system (Method which throws exception returns Nothing as the return type; all values under val, var falls under this)


Extractor Object - with unapply method (match the value and take it apart)

Pattern matching - case statement

def matchValue(x: Int): String = x match {

          case 1 => "One"

          case 2 => "Two"

          case _ => "many" 

            }

 Queue  val emptyQ = new scala.collection.mutable.Queue[Int] 


Thursday, February 4, 2021

Spark-Scala-Compatibility and other notes


Downloaded Scala IDE - it came with default Scala SDK v2.12
Scala IDE build of Eclipse SDK  Build id: 4.7.0-vfinal-2017-09-29T14:34:02Z-Typesafe 

Tried to use Spark 2.10 jars however it was not compatible - thrown compatibility error, could not find main method etc

Then downloaded Spark 1.6.1  (which is the version my client used) - unzipped added all the jars from spark-1.6.1-bin-hadoop2.6\lib - 3 datanucleus jars (api, core, rdbms, spark-1.6.1.yarn-shuffle, spark-assembly-1.6.1-hadoop2.6.0jar) - spark-assembly jar is 183 MB - contains a lot of dependent jars
I had to change Scala compiler for the project to Scala 2.10 to make both compatible.

then it started running.
=============================================

val sparkConf = new SparkConf().setAppName("sparkSQLExamples").setMaster("local[*]")
    .setIfMissing("hive.execution.engine", "spark")
    .setIfMissing("spark.cassandra.connection.host", "127.0.0.1")
    .setIfMissing("spark.cassandra.connection.port", "9042")
val sparkContext = new SparkContext(sparkConf)


Spark SQLContext allows us to connect to different Data Sources to write or read data from them, but it has limitations, namely that when the program ends or the Spark shell is closed, all links to the data sources we have created are temporary and will not be available in the next session.
This limitation is solved with HiveContext, since it uses a MetaStore to store the information of those “external” tables. In our example, this MetaStore is MySql. This configuration is included in a resource file (hive-site.xml) used by Hive. 

Hive External table vs Managed Table

External - data owned by the user, just the metadata gets deleted on table drop - not the data, partitions are managed by the user, 







Scala Closures are functions which uses one or more free variables and the return value of this function is dependent of these variable. The free variables are defined outside of the Closure Function and is not included as a parameter of this function. So the difference between a closure function and a normal function is the free variable. A free variable is any kind of variable which is not defined within the function and not passed as the parameter of the function. A free variable is not bound to a function with a valid value. The function does not contain any values for the free variable.
Example:
If we define a function as shown below:

def example(a:double) = a*p / 100 (here p is a free variable)

Example: udf, rdd, dataframe



@transient lazy val

Given a you have a Scala object holding some data that you want to store or send around by serializing the object. It turns out that the object is also capable of performing some complex logic and it stores the results of these calculations in its field values. While it might be efficient to store the calculation results in memory for later lookup, it might be a bad idea to also serialize these fields as this will consume space you do not want to sacrifice or as this will increase network throughput (e.g., in Spark) resulting in more time being consumed than it requires to recalculate the fields. Now one could write a custom serializer for this task, but let us be honest: thats not really the thing we want to spent our time on.

This is where the @transient lazy val pattern comes in. In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference. With @transient on the other hand one can denote a field that shall not be serialized.

Putting this together we can now write our "recalculate rather than serialize logic":

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference.

With @transient on the other hand one can denote a field that shall not be serialized.

So @transient lazy val --> as used in udf's utility Class --> tell the execution Engine that

 dont serialize the variable (say cryptoLib object) while broadcasting the to be UDF object 

(the UtilClass with the fuction to expose as udf closure) and once the object is @ spark 

executor - create it only once ("lazy") for all records.























AWS Dashboards

 

Decide what data goes into each dashboard based on who will use the dashboard and why they will use the dashboard.

https://aws.amazon.com/builders-library/building-dashboards-for-operational-visibility/ 


A very common tendency when designing dashboards is to overestimate or underestimate the domain knowledge of the target user. 

It’s easy to build a dashboard that makes total sense to its creator. 

However, this dashboard might not provide value to users. 

We use the technique of working backwards from the customer (in this case, the dashboard users) to eliminate this risk. 

Define KPI's - Key performance Index - need to be identified from dashboard user.

Before diving headfirst into dashboard design, sit with your end-users in order to gather requirements and define KPIs. Without doing that, you can design the most beautiful dashboard in the world, but it won’t change the way users make decisions in the long run.





Python note: Boto/Boto3 is the Amazon Web Services (AWS) SDK for Python
It enables Python developers to create, configure, and manage AWS services,


4 Key factors about dashboard design




  • Relationship: connection between two or more variables
  • Comparison: compare two or more variables side by side
  • Composition: breaking data into separate components
  • Distribution: range and grouping of values within data

Data Pipeline -