Tuesday, December 21, 2021

MessageDigest & threadSafety: duplicate hash values on large concurrent processing (Spark-UDF)

Issue: MessageDigest was used as a singleton for this UDF exposed function. As MessageDigest is not threadSafe, it caused duplicate hash values when a large volume of concurrent data was processed.

Solution: So changed the logic to initialize the object for each call.

So see below the old and new approaches highlighted below (obsolete code can be removed) 



Further reference:

how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/

need-thread-safe-messagedigest-in-java

Sunday, May 2, 2021

MySQL - implement Windows Function like logic in MySQL 5.6 version ( windows fn is there fro 8.0)

 MySQL - WINDOWS FUNCTION Implementation without Windows function 

#this is needed in MySQL version below version 8.0 

Scenario:

There are records with seemingly PrimaryKey column (cant make it PK, and so duplicate can come in) , a ForeignKey and createTImestamp --> want to make sure that processing happens only once for the primaryKey  - if there are duplicate entries - only way to find it is from  the order of create timestamp and if processed already, foreignKey column will have value.

So want to ientify is there was a prior ForeignKey assigned - which means -   its already processed --- AND  if at all there are 2 entries which are not processed yet, make sure to process just the first one and mark other one as cancelled



SELECT id,
     crew_id,
     amount,
     CASE type 
         WHEN @curType THEN @curRow := @curRow + 1 
         ELSE @curRow := 1
     END AS rank,
     @curType := type AS type
FROM Table1 p
JOIN (SELECT @curRow := 0, @curType := '') r
ORDER BY crew_id, type


### Getting the rank based on execId & createTimestamp

SELECT foreignId, createTimestamp,
		CASE executionId 
			WHEN @curType THEN @curRow := @curRow + 1 
			ELSE @curRow := 1 
		END AS rank,
        @curType := executionId AS executionId
FROM  job_queue_prd p
JOIN (SELECT @curRow := 0, @curType := '') r
WHERE p.executionID in (204626, 204851) 
ORDER BY  executionId, createTimestamp asc;

### Getting current & prior instanceId & the rank based on execId & createTimestamp

SET @instanceId=0; SELECT foreignId, createTimestamp, @instanceId prior_foreignId, @foreignId:=foreignId foreignId,
CASE executionId WHEN @curExecId THEN @curRow := @curRow + 1 ELSE @curRow := 1 END AS rank, @curExecId := executionId AS executionId FROM job_queue_prd p JOIN (SELECT @curRow := 0, @curExecId := '') r WHERE p.executionID in (204626, 204851) ORDER BY executionId, createTimestamp asc;

Saturday, April 24, 2021

GIT Repo , Repo Management Systems - BitBucket vs GitHub

 


Git is a distributed version control system.


Stash is a repository management system of Atlassian - in 2015 its renamed to BitBucket - So, its a management tool for Git


There are many repository management systems you can use with Git. 


One of the most popular is Stash in Enterprise world. 


But when you look to open source world, GitHub is the most popular one as far as I know.


Ref: https://stackoverflow.com/questions/32294534/what-is-the-relationship-between-git-and-stash


Tuesday, March 30, 2021

Scala Notes

 

Builder Pattern (Scala, Java) SparkSession uses builder patterns

https://www.freecodecamp.org/news/how-to-solve-the-builder-patterns-boilerplate-problem-2ea97001dbe6/ 

The builder pattern is great for classes that can have complex initialization. It typically consists of setting values for multiple variables, some of which may be required with the rest being optional. 
…in which you provide a constructor with only the required parameters, another with a single optional parameter, a third with two optional parameters, and so on, culminating in a constructor with all the optional parameters.
Actually writing a builder can be quite a tedious and repetitive task.
Doing it dynamically - using Lombok library - which does dynamic code generation

@Builder annotation - if applied on a class (POJO) will create its Builder class automatically

Support @Builder on Person class will create PersonBuilder class ad you can set it as below

Person.builder().firstName("Adam").lastName("Savage") .city("San Francisco").jobTitle("TV Personality").build(); 


Tail-Recursion (import scala.annotation.tailrec)

All recursive cases are not tail-recursive (factorial is not, becuase it has to

keep intermediate result - but GCD is - because it does not have to memorize)

https://www.geeksforgeeks.org/tail-recursion-in-scala/

https://www.scala-exercises.org/scala_tutorial/tail_recursion

Example: GCD aka GCF https://www.khanacademy.org/math/cc-sixth-grade-math/cc-6th-factors-and-multiples/cc-6th-gcf/v/greatest-common-divisor

 Monad


Closures

Trait

List manipulation

var myList = List.empty[String]

 myList :+= "sree"

myList

res2: List[String] = List(sree)

Immutable by design - helps with concurrent programming

Option - 

Yield -  produces a value for each iteration

Null, Nil, None, Nothing

Null - absense of value

Nil - end of list

 None - value of an Option which has no value in it

Nothing - lowest type in the entire type system (Method which throws exception returns Nothing as the return type; all values under val, var falls under this)


Extractor Object - with unapply method (match the value and take it apart)

Pattern matching - case statement

def matchValue(x: Int): String = x match {

          case 1 => "One"

          case 2 => "Two"

          case _ => "many" 

            }

 Queue  val emptyQ = new scala.collection.mutable.Queue[Int] 


Thursday, February 4, 2021

Spark-Scala-Compatibility and other notes


Downloaded Scala IDE - it came with default Scala SDK v2.12
Scala IDE build of Eclipse SDK  Build id: 4.7.0-vfinal-2017-09-29T14:34:02Z-Typesafe 

Tried to use Spark 2.10 jars however it was not compatible - thrown compatibility error, could not find main method etc

Then downloaded Spark 1.6.1  (which is the version my client used) - unzipped added all the jars from spark-1.6.1-bin-hadoop2.6\lib - 3 datanucleus jars (api, core, rdbms, spark-1.6.1.yarn-shuffle, spark-assembly-1.6.1-hadoop2.6.0jar) - spark-assembly jar is 183 MB - contains a lot of dependent jars
I had to change Scala compiler for the project to Scala 2.10 to make both compatible.

then it started running.
=============================================

val sparkConf = new SparkConf().setAppName("sparkSQLExamples").setMaster("local[*]")
    .setIfMissing("hive.execution.engine", "spark")
    .setIfMissing("spark.cassandra.connection.host", "127.0.0.1")
    .setIfMissing("spark.cassandra.connection.port", "9042")
val sparkContext = new SparkContext(sparkConf)


Spark SQLContext allows us to connect to different Data Sources to write or read data from them, but it has limitations, namely that when the program ends or the Spark shell is closed, all links to the data sources we have created are temporary and will not be available in the next session.
This limitation is solved with HiveContext, since it uses a MetaStore to store the information of those “external” tables. In our example, this MetaStore is MySql. This configuration is included in a resource file (hive-site.xml) used by Hive. 

Hive External table vs Managed Table

External - data owned by the user, just the metadata gets deleted on table drop - not the data, partitions are managed by the user, 







Scala Closures are functions which uses one or more free variables and the return value of this function is dependent of these variable. The free variables are defined outside of the Closure Function and is not included as a parameter of this function. So the difference between a closure function and a normal function is the free variable. A free variable is any kind of variable which is not defined within the function and not passed as the parameter of the function. A free variable is not bound to a function with a valid value. The function does not contain any values for the free variable.
Example:
If we define a function as shown below:

def example(a:double) = a*p / 100 (here p is a free variable)

Example: udf, rdd, dataframe



@transient lazy val

Given a you have a Scala object holding some data that you want to store or send around by serializing the object. It turns out that the object is also capable of performing some complex logic and it stores the results of these calculations in its field values. While it might be efficient to store the calculation results in memory for later lookup, it might be a bad idea to also serialize these fields as this will consume space you do not want to sacrifice or as this will increase network throughput (e.g., in Spark) resulting in more time being consumed than it requires to recalculate the fields. Now one could write a custom serializer for this task, but let us be honest: thats not really the thing we want to spent our time on.

This is where the @transient lazy val pattern comes in. In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference. With @transient on the other hand one can denote a field that shall not be serialized.

Putting this together we can now write our "recalculate rather than serialize logic":

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

In Scala lazy val denotes a field that will only be calculated once it is accessed for the first time and is then stored for future reference.

With @transient on the other hand one can denote a field that shall not be serialized.

So @transient lazy val --> as used in udf's utility Class --> tell the execution Engine that

 dont serialize the variable (say cryptoLib object) while broadcasting the to be UDF object 

(the UtilClass with the fuction to expose as udf closure) and once the object is @ spark 

executor - create it only once ("lazy") for all records.























AWS Dashboards

 

Decide what data goes into each dashboard based on who will use the dashboard and why they will use the dashboard.

https://aws.amazon.com/builders-library/building-dashboards-for-operational-visibility/ 


A very common tendency when designing dashboards is to overestimate or underestimate the domain knowledge of the target user. 

It’s easy to build a dashboard that makes total sense to its creator. 

However, this dashboard might not provide value to users. 

We use the technique of working backwards from the customer (in this case, the dashboard users) to eliminate this risk. 

Define KPI's - Key performance Index - need to be identified from dashboard user.

Before diving headfirst into dashboard design, sit with your end-users in order to gather requirements and define KPIs. Without doing that, you can design the most beautiful dashboard in the world, but it won’t change the way users make decisions in the long run.





Python note: Boto/Boto3 is the Amazon Web Services (AWS) SDK for Python
It enables Python developers to create, configure, and manage AWS services,


4 Key factors about dashboard design




  • Relationship: connection between two or more variables
  • Comparison: compare two or more variables side by side
  • Composition: breaking data into separate components
  • Distribution: range and grouping of values within data

Data Pipeline - 


















Data Engineering | DW Design(Star/Showflake) | NF | AWS Monitoring | Graphs-Charts

 

Data Engineering: Row data to Useful info

Role of Data Engineer, skills to have:

Data Modeling: Simplify complex software design into simple ones (break it) - provide a visual representation

Design Schemas: Star & Snowflake

Structured & Unstructured Data

Reference link1

Big Data 4 Vs

 

FSCK - File System Check - to check for files and discrepancies in files (in hdfs)

Job tracker, task tracker, Name node ports: 50030, 50060, 50070 respectively 

Hive Metastore: storage location of schema and tables (definitions, mapping) - later stored in RDBMS

spark.sql.warehouse.dir is a static configuration property that sets Hive’s hive.metastore.warehouse.dir property, i.e. the location of default database for the Hive warehouse.

Hive Collections - Array, Map, Struct, Union  - refer link

SerDe - Serializer (object to storage form) - DeSerializer (stored format to prior form)

.hiverc - for initialization

*args & *kwargs (python - variable arguments passing... )









AWS Monitoring - starts

System Status check

A system status check failure indicates a problem with the AWS systems that your instance runs on. 

Check for any outage, or get it resolved by itself or terminate/restart the instance 

https://aws.amazon.com/premiumsupport/knowledge-center/ec2-windows-system-status-check-fail/

 Instance status check

Determine whether Amazon EC2 has detected any problems that might prevent your instances from running applications

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/monitoring-system-instance-status-check.html

Monitor your instances using CloudWatch

Collects and processes raw data from Amazon EC2 into readable, near real-time metrics. These statistics are recorded for a period of 15 months

By default, Amazon EC2 sends metric data to CloudWatch in 5-minute periods. To send metric data for your instance to CloudWatch in 1-minute periods, you can enable detailed monitoring on the instance.

https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch.html

Amazon EventBridge? (formerly called Amazon CloudWatch Events)

Serverless event bus service that makes it easy to connect your applications with data from a variety of sources. 
EventBridge delivers a stream of real-time data from your own applications, Software-as-a-Service (SaaS) applications, and AWS services and routes that data to targets such as AWS Lambda.

Events, Rules, Targets, Event Bus

https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html 

CloudWatch Agent 

Collecting Metrics and Logs from Amazon EC2 Instances and On-Premises Servers with the CloudWatch Agent

https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Install-CloudWatch-Agent.html

AWS Monitoring - ends

Adhoc topic - EFS - Shared Access, Size Auto Scales to Petabytes on demand.

Amazon Elastic File System (Amazon EFS) provides a simple, scalable, fully managed elastic NFS file system for use with AWS Cloud services and on-premises resources. 

It is built to scale on demand to petabytes without disrupting applications, growing and shrinking automatically as you add and remove files, eliminating the need to provision and manage capacity to accommodate growth. 

Amazon EFS is designed to provide massively parallel shared access to thousands of Amazon EC2 instances, enabling your applications to achieve high levels of aggregate throughput and IOPS with consistent low latencies. 

 

Data Warehouse design - Star Schema, Snowflake Schema (Fact & Dimension table)

    Star Schema (de-normalized dimension tables

link1 link2

Fact tables has measures ----> dimesion tables give more context to the fact table

Star


Snowflake Schema

 

Star-vs-Snowflake


Primary Key - Unique Key

ParameterPRIMARY KEYUNIQUE KEY
BasicUsed to serve as a unique identifier for each row in a table.Uniquely determines a row which isn’t primary key.
NULL value acceptanceCannot accept NULL values.Can accept one NULL value.
Number of keys that can be defined in the tableOnly one primary keyMore than one unique key
IndexCreates clustered indexCreates non-clustered index

NORMAL FORMS (1NF, 2NF, 3NF) - Eliminating Data Redundancy (link1)

1NF - Single column cannot have multiple values (Atomicity)
            modified to below format for atomicity

2NF = 1NF + Table should not have partial dependencies 



Here office location only depends on the department Id - so split it. 

 3NF = 2NF + no transitive dependency on non-prime attributes

StudentId determines subject via subjectId --> transitive dependency (see below) 

Boyce Codd NF (3.5NF)  - super key (non-prime attribute depends on prime attribute)

 
 Professor is a non-prime attribute - depends on the prime attribute - subject)