Monday, August 22, 2022

Yarn/Spark Log aggregation - log4j, RootLogger, Appenders, yarn.log-aggregation-enable

 

Reference: https://medium.com/@iacomini.riccardo/spark-logging-configuration-in-yarn-faf5ba5fdb01

In Cloudera yarn.log-aggregation-enable is enabled by default

yarn-site.xml

<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.log-aggregation.roll-monitoring-interval- seconds</name>
<value>3600</value>
</property>


Friday, August 19, 2022

KaflaProducer and SparkStreaming - things to remember - auth keystore load & file ulimit - too many open files error

 

When Spark Streaming or Batch process writes to Kafka Topic

Be aware of the authentication done by org.apache.kafka.client.producer.KafkaPublisher send/doSend methods - which loads the keystore/keytab for authentication for each message it publishes

Which means it will read the keytab from the system so many times - so as the message count increases, the number of open file handles will increase extensively and can potentially cause the ULIMIT (max # of open file handles) to exceed causing too many open files error thrown and job fails

Make sure to publish the RDD - distributed way

Tuesday, December 21, 2021

MessageDigest & threadSafety: duplicate hash values on large concurrent processing (Spark-UDF)

Issue: MessageDigest was used as a singleton for this UDF exposed function. As MessageDigest is not threadSafe, it caused duplicate hash values when a large volume of concurrent data was processed.

Solution: So changed the logic to initialize the object for each call.

So see below the old and new approaches highlighted below (obsolete code can be removed) 



Further reference:

how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/

need-thread-safe-messagedigest-in-java

Sunday, May 2, 2021

MySQL - implement Windows Function like logic in MySQL 5.6 version ( windows fn is there fro 8.0)

 MySQL - WINDOWS FUNCTION Implementation without Windows function 

#this is needed in MySQL version below version 8.0 

Scenario:

There are records with seemingly PrimaryKey column (cant make it PK, and so duplicate can come in) , a ForeignKey and createTImestamp --> want to make sure that processing happens only once for the primaryKey  - if there are duplicate entries - only way to find it is from  the order of create timestamp and if processed already, foreignKey column will have value.

So want to ientify is there was a prior ForeignKey assigned - which means -   its already processed --- AND  if at all there are 2 entries which are not processed yet, make sure to process just the first one and mark other one as cancelled



SELECT id,
     crew_id,
     amount,
     CASE type 
         WHEN @curType THEN @curRow := @curRow + 1 
         ELSE @curRow := 1
     END AS rank,
     @curType := type AS type
FROM Table1 p
JOIN (SELECT @curRow := 0, @curType := '') r
ORDER BY crew_id, type


### Getting the rank based on execId & createTimestamp

SELECT foreignId, createTimestamp,
		CASE executionId 
			WHEN @curType THEN @curRow := @curRow + 1 
			ELSE @curRow := 1 
		END AS rank,
        @curType := executionId AS executionId
FROM  job_queue_prd p
JOIN (SELECT @curRow := 0, @curType := '') r
WHERE p.executionID in (204626, 204851) 
ORDER BY  executionId, createTimestamp asc;

### Getting current & prior instanceId & the rank based on execId & createTimestamp

SET @instanceId=0; SELECT foreignId, createTimestamp, @instanceId prior_foreignId, @foreignId:=foreignId foreignId,
CASE executionId WHEN @curExecId THEN @curRow := @curRow + 1 ELSE @curRow := 1 END AS rank, @curExecId := executionId AS executionId FROM job_queue_prd p JOIN (SELECT @curRow := 0, @curExecId := '') r WHERE p.executionID in (204626, 204851) ORDER BY executionId, createTimestamp asc;

Saturday, April 24, 2021

GIT Repo , Repo Management Systems - BitBucket vs GitHub

 


Git is a distributed version control system.


Stash is a repository management system of Atlassian - in 2015 its renamed to BitBucket - So, its a management tool for Git


There are many repository management systems you can use with Git. 


One of the most popular is Stash in Enterprise world. 


But when you look to open source world, GitHub is the most popular one as far as I know.


Ref: https://stackoverflow.com/questions/32294534/what-is-the-relationship-between-git-and-stash


Tuesday, March 30, 2021

Scala Notes

 

Builder Pattern (Scala, Java) SparkSession uses builder patterns

https://www.freecodecamp.org/news/how-to-solve-the-builder-patterns-boilerplate-problem-2ea97001dbe6/ 

The builder pattern is great for classes that can have complex initialization. It typically consists of setting values for multiple variables, some of which may be required with the rest being optional. 
…in which you provide a constructor with only the required parameters, another with a single optional parameter, a third with two optional parameters, and so on, culminating in a constructor with all the optional parameters.
Actually writing a builder can be quite a tedious and repetitive task.
Doing it dynamically - using Lombok library - which does dynamic code generation

@Builder annotation - if applied on a class (POJO) will create its Builder class automatically

Support @Builder on Person class will create PersonBuilder class ad you can set it as below

Person.builder().firstName("Adam").lastName("Savage") .city("San Francisco").jobTitle("TV Personality").build(); 


Tail-Recursion (import scala.annotation.tailrec)

All recursive cases are not tail-recursive (factorial is not, becuase it has to

keep intermediate result - but GCD is - because it does not have to memorize)

https://www.geeksforgeeks.org/tail-recursion-in-scala/

https://www.scala-exercises.org/scala_tutorial/tail_recursion

Example: GCD aka GCF https://www.khanacademy.org/math/cc-sixth-grade-math/cc-6th-factors-and-multiples/cc-6th-gcf/v/greatest-common-divisor

 Monad


Closures

Trait

List manipulation

var myList = List.empty[String]

 myList :+= "sree"

myList

res2: List[String] = List(sree)

Immutable by design - helps with concurrent programming

Option - 

Yield -  produces a value for each iteration

Null, Nil, None, Nothing

Null - absense of value

Nil - end of list

 None - value of an Option which has no value in it

Nothing - lowest type in the entire type system (Method which throws exception returns Nothing as the return type; all values under val, var falls under this)


Extractor Object - with unapply method (match the value and take it apart)

Pattern matching - case statement

def matchValue(x: Int): String = x match {

          case 1 => "One"

          case 2 => "Two"

          case _ => "many" 

            }

 Queue  val emptyQ = new scala.collection.mutable.Queue[Int]