Jun 26, 2016

Apache Zeppelin provides a Web-UI where you can iteratively build spark scripts in Scala, Python, etc. (It also provides autocomplete support), run Sparkql queries against Hive or other store and visualize the results from the query or spark dataframes. This is somewhat akin to what Ipython notebooks do for python. Spark developers know that building, testing and fixing errors in spark scripts can be a lengthy process (It is also dull because it is not interactive), but if you use Apache Zeppelin, you can iteratively buld and test portions of your script and this will enhance your productivity significantly.

Installing and Configuring Apache Zeppelin

Ensure following prerequisites are installed
  • Java 8: su -c yum install java-1.8.0-openjdk-devel
  • Maven 3.1.x+: sudo yum install apache-maven and then link it sudo ln -s /usr/share/apache-maven/bin/mvn /usr/bin/mvn. If this does not work for you, you can install it the following way.
    wget http://www.eu.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz
    sudo tar -zxf apache-maven-3.3.3-bin.tar.gz -C /usr/local/
    sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn
  • Git: sudo yum install git
  • NPM: yum install nodejs npm
  • Either download the source code from here or clone the git repository in a folder as git clone https://github.com/apache/incubator-zeppelin.git
  • Build from source, Go to the incubator-zeppelin directory and run the following command from it.
    mvn clean package -Pspark-1.5 -Ppyspark -Dhadoop.version=2.6.0-cdh5.5.0 -Phadoop-2.6 -Dmaven.test.skip=true
    This command works for version 5.5 of cloudera distribution, make sure your versions of hadoop and spark are correct. In addtion to installing support for spark, this command will configure zeppelin with support for pyspark as well.
  • To configure access for hive metastore copy the hive-site.xml to conf directory under zeppelin.
  • In the conf folder create copies of files zeppelin-env.sh.template and zeppelin-site.xml.template as zeppelin-env.sh and zeppelin-site.xml respectively.
  • If you would like to change the port for zeppelin, change the following property in zeppelin-site.xml.
      <description>Server port.</description>
  • To start zeppelin use the command ./zeppelin-daemon.sh start. Then you can access zeppelin ui at http://localhost:8999 [1]
  • To stop zeppelin use the command ./zeppelin-daemon.sh stop

Running SparkQL queries against Hive and Visualizing Results

In a cell in zeppelin type %hive to activate interpreter with hive ql support. After you do this, you can then run the query and the visualization support is automatically activated in the output. To execute the cell use Shift+Enter key.

Bulding scala scripts and plotting model outputs

You can also code in scala or python by activating the interpreter. Scala and Spark interpreter is activated by default for a cell.
To visualize the spark dataframe just use z.show(df) command.

Writing documentation

Activate the markdown support in a cell by using %md. You can then add documentation along with your code. Unfortunately, the support for latex is still not there, but it should be there in future releases.

What's missing ?

Unlike ipython notebooks, there is no option to export to html or pdf(using latex). Also, the support for embedding latex expressions is missing, but these features should be added in future releases.


Although certain features are missing, Apache Zeppelin surely helps you in increasing your productivity by reducing the time required for build, test and fix cycle. Also, it provides nice visualization capabilities for your queries and dataframes.
[1]If you changed the port.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

Apache spark has an advanced DAG execution engine and supports in memory computation. In memory computation combined with DAG execution leads to a far better performance than running map reduce jobs. In this post, I will show an example of using Linear regression with Apache Spark. The dataset is NYC-Yellow taxi dataset for a particular month in 2015. The data was filtered to extract records for a day.
This example uses HiveContext [1] which is an instance of Spark SQL execution engine that integrates with Hive data store. The dataset has the following features.
Feature NameFeature Data Type
duration (journey_end_time-journey_start_time)Double
store_and_forward_flag(categorical, requires convertion)String "Y/N"
ratecodeid( categorical, requires convertion)Int
fare_amount(target variable)Double
We want to predict the fare_amount given the set of features. As fare is a continuous variable, so the task of predicting fare requires a regression model.

Things to consider:

  • To obtain the data into the dataframe, we must first query the hive store using hiveCtxt.sql() method. We can drop invalid records using na.drop() [2] on the obtained dataframe and then cache it usingcache() method for later use.
  • The two categorical variables need to be converted to vector representation. This is done by usingStringIndexer and OneHotEncoder. Look at the method preprocessFeatures() in the code below.
  • Models can be saved by serializing them as sc.parallelize(Seq(model),1).saveAsObjectFile("nycyellow.model") and can be used by deserializing themsc.objectFile[CrossValidatorModel]("nycyellow.model").first(). Newer spark api supports OOTB methods for doing this and using those methods is recommended.
  • Data can be split into training and testing data by using randomSplit() method on the DataFrame. Although if you are using cross validation, it is recommended to train the model on the entire sample dataset.
  • The features in the dataframe must be transformed using VectorAssembler into the vector representation and the column should be named as features. The target variable should be renamed as label, you can use withColumnRenamed() function to do so.
  • Cross validation can be performed using CrossValidatorModel and estimator can be set bysetEstimator().
  • The evaluator chosen depends on whether you are doing classification or regression. In this case, we would use RegressionEvaluator
  • You can specify different values for parameters such as regularization parameter, number of iterations and those would be used by CrossValidatorModel to come up with the best set of parameters for your model.
  • After this you can fit the model with the dataset and evaluate its performance. In this case, as we are testing regression model accuracy. We can use RegressionMetrics to compare the predicted_fare vs actual_fare. The measures that can be used are R-Squared (r2), Mean Absolute Error.
  • For new predictions the saved model can be reused. The new data needs to be transformed into the same format as was used to train the model. To do so we must first create a dataframe usingStructType to specify its structure, then preprocess features the same way by invokingpreprocessFeatures() method.
  • The data can be visualized using Apache Zeppelin [3].

The code.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.PipelineStage
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{ Vector, Vectors }
import org.apache.spark.sql.Row;
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.evaluation.RegressionEvaluator
import akka.dispatch.Foreach
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.PipelineModel
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.tuning.CrossValidatorModel
import scala.collection.mutable.ListBuffer
import edu.nyu.realtimebd.analytics.nyctaxi.domain.NYCDomain.NYCParams
import org.apache.spark.sql.types.IntegerType

*@Author Ramandeep Singh
object Analytics {

val sparkConf = new SparkConf().setAppName("NYC-TAXI-ANALYSIS").setMaster("local")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val hiveCtxt = new HiveContext(sc)
var df: DataFrame = _
def initializeDataFrame(query: String): DataFrame = {
  //cache the dataframe
  if (df == null) {
    df = hiveCtxt.sql(query).na.drop().cache()
  return df
def preprocessFeatures(df: DataFrame): DataFrame = {
  val stringColumns = Array("store_and_fwd_flag", "ratecodeid")
  var indexModel: PipelineModel = null;
  var oneHotModel: PipelineModel = null;
  try {
    indexModel = sc.objectFile[PipelineModel]("nycyellow.model.indexModel").first()

  } catch {
    case e: InvalidInputException => println()
  if (indexModel == null) {
    val stringIndexTransformer: Array[PipelineStage] = stringColumns.map(
      cname => new StringIndexer().setInputCol(cname).setOutputCol(s"${cname}_index"))
    val indexedPipeline = new Pipeline().setStages(stringIndexTransformer)
    indexModel = indexedPipeline.fit(df)
    sc.parallelize(Seq(indexModel), 1).saveAsObjectFile("nycyellow.model.indexModel")


  var df_indexed = indexModel.transform(df)
  stringColumns.foreach { x => df_indexed = df_indexed.drop(x) }
  val indexedColumns = df_indexed.columns.filter(colName => colName.contains("_index"))
  val oneHotEncodedColumns = indexedColumns
  try {
    oneHotModel = sc.objectFile[PipelineModel]("nycyellow.model.onehot").first()
  } catch {
    case e: InvalidInputException => println()

  if (oneHotModel == null) {
    val oneHotTransformer: Array[PipelineStage] = oneHotEncodedColumns.map { cname =>
      new OneHotEncoder().
    val oneHotPipeline = new Pipeline().setStages(oneHotTransformer)
    oneHotModel = oneHotPipeline.fit(df_indexed)

    sc.parallelize(Seq(oneHotModel), 1).saveAsObjectFile("nycyellow.model.onehot")

  df_indexed = oneHotModel.transform(df_indexed)
  indexedColumns.foreach { colName => df_indexed = df_indexed.drop(colName) }
def buildPriceAnalysisModel(query: String) {
  var df_indexed = preprocessFeatures(df)
  df_indexed.columns.foreach(x => println("Preprocessed Columns Model Training" + x))
  val df_splitData: Array[DataFrame] = df_indexed.randomSplit(Array(0.7, 0.3), 11l)
  val trainData = df_splitData(0)
  val testData = df_splitData(1)
  //drop target variable
  val testData_x = testData.drop("fare_amount")
  val testData_y = testData.select("fare_amount")
  val columnsToTransform = trainData.drop("fare_amount").columns
  //Make feature vector
  val vectorAssembler = new VectorAssembler().
  columnsToTransform.foreach { x => println(x) }
  val trainDataTemp = vectorAssembler.transform(trainData).withColumnRenamed("fare_amount", "label")
  val testDataTemp = vectorAssembler.transform(testData_x)
  val trainDataFin = trainDataTemp.select("features", "label")
  val testDataFin = testDataTemp.select("features")
  val linearRegression = new LinearRegression()
  trainDataFin.columns.foreach(x => println("Final Column =>" + x))
  //Params for tuning the model.
  val paramGridMap = new ParamGridBuilder()
    .addGrid(linearRegression.maxIter, Array(10, 100, 1000))
    .addGrid(linearRegression.regParam, Array(0.1, 0.01, 0.001, 1, 10)).build()
  //5 fold cross validation
  val cv = new CrossValidator().setEstimator(linearRegression).
    setEvaluator(new RegressionEvaluator()).setEstimatorParamMaps(paramGridMap).setNumFolds(5)
  //Fit the model
  val model = cv.fit(trainDataFin)
  val modelResult = model.transform(testDataFin)
  val predictionAndLabels = modelResult.map(r => r.getAs[Double]("prediction")).zip(testData_y.map(R => R.getAs[Double](0)))
  val regressionMetrics = new RegressionMetrics(predictionAndLabels)
  //Print the results
  println(s"R-Squared= ${regressionMetrics.r2}")
  println(s"Explained Variance=${regressionMetrics.explainedVariance}")
  println(s"MAE= ${regressionMetrics.meanAbsoluteError}")
  val lrModel = model.bestModel.asInstanceOf[LinearRegressionModel]
  sc.parallelize(Seq(model), 1).saveAsObjectFile("nycyellow.model")

def predictFare(list: ListBuffer[NYCParams]): DataFrame = {
  var nycModel: CrossValidatorModel = null;
  try {
    nycModel = sc.objectFile[CrossValidatorModel]("nycyellow.model").first()
  } catch {
    case e: InvalidInputException => println()
  if (nycModel == null) {
   (cast(journey_end_time as double)-cast(journey_start_time as double)) as duration,
   hour(journey_start_time) as start_hour,
   minute(journey_start_time) as start_minute,
   second(journey_start_time) as start_second,
   fare_amount from nyc_taxi_data_limited
   where start_latitude <> 0 and trip_distance >0
  and journey_end_time>journey_start_time and
  trip_distance <200 and fare_amount>1 limit 12000""")
  nycModel = sc.objectFile[CrossValidatorModel]("nycyellow.model").first()
  var schema = StructType(Array(
    StructField("trip_distance", DoubleType, true),
    StructField("duration", DoubleType, true),
    StructField("store_and_fwd_flag", StringType, true),
    StructField("ratecodeid", DoubleType, true),
    StructField("start_hour", IntegerType, true),
    StructField("start_minute", IntegerType, true),
    StructField("start_second", IntegerType, true)))
  var rows: ListBuffer[Row] = new ListBuffer
  list.foreach(x => rows += Row(x.trip_distance, x.duration, x.store_and_fwd_flag, x.ratecodeid, x.start_hour, x.start_minute, x.start_second))
  val row = sc.parallelize(rows)
  var dfStructure = sqlContext.createDataFrame(row, schema)
  var preprocessed = preprocessFeatures(dfStructure)
  preprocessed.columns.foreach(x => println("Preprocessed Columns " + x))
  val vectorAssembler = new VectorAssembler().
  preprocessed = vectorAssembler.transform(preprocessed)
  var results = nycModel.transform(preprocessed.select("features"))



Upon training the model, it gave the following results against the test data set.
R-Squared= 0.954496421456682
MAE= 1.1704343793855545
To predict the fares for our inputs we can invoke predictFare() method. Example code to do so is mentioned below.

class TestAnalytics {
  def main(args: Array[String]) {
    var testAnalytics = Analytics
    val testData = new ListBuffer[NYCParams]()
    testData += NYCParams(10.6, 600.0, "N", 1.0, 10, 2, 33)
    var result = testAnalytics.predictFare(testData)
After the initial invocation all the models are stored in the directory from which the execution is carried out.
For the sample request above the result is shown below.
This prediction shows that the journey for 10.6 miles, if covered in 10 minutes, by using NYC yellow taxi would cost roughly 31 dollars.

This code is part of a project that I did, to browse the entire repository and access the dataset on Github click here.
[1]To use hive, hive-site.xml must be placed in spark/conf directory.
[2]null columns are considered invalid records by ml models.
[3]This will be covered in a future post.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

Google recently has deprecated the Google+ Sign in and process of obtaining oauth access tokens viaGoogleAuthUtil.getToken API. Now, they reccomend a single entry point via new Google Sign-In API. The major reasons for doing so are 1. It enhances user experience and 2. It improves security, more here. Also starting with android 6.0, the GET_ACCOUNTS permission has to be requested at runtime and if you implement this API, it eliminates the need for requiring this permission.
The feature that is really exciting is that it introduces new silentSignIn API, which allows for cross device silent sign in (essentially if a user has signed into your application on another platform, he won't be shown the sign in prompt) provided that the requested scopes are same, so this improves the user experience. In addition, you don't have to use the GoogleAuthUtil.getToken API to obtain the tokens as they are granted on the initial sign-in.
So if you have an android application in which you had previously implemented the Google+ sign in and used other Google plus features and want to migrate your android applications to new Google Sign in implementation, this post explains how to do so. Depending upon whether you choose to automate the lifecycle for GoogleAPIClient (Use enableAutoManage, this approach is recommended as it avoids boilerplate code) or manage the lifecycle for GoogleAPIClient by implementing the ConnectionCallbacks interface, the code might slightly differ. But, as the latter approach requires a bit more code, I will explain the process using it.

What needs to be changed

  • Replace mGoogleApiClient.connect() with mGoogleApiClient.connect(GoogleApiClient.SIGN_IN_MODE_OPTIONAL), this is basically required to allow the client to transition between authenticated and unauthenticated states and for use with GoogleSignInApi.
  • Build a GoogleSignInOptions instance. While building the instance, request the additional scopes via requestScopes method,( this is where you can request scopes such as SCOPE_PLUS_LOGIN andSCOPE_PLUS_PROFILE). Also, if you need to authenticate the user with the backend and want to obtain the authorization token to access the API's using your backend use requestIdToken(serverToken) andrequestServerAuthCode(serverToken) methods. Here unlike Google plus sign in the serverToken is just the clientId of the web application.
  • Build the GoogleApiClient instance, use the addApi method to add the Auth.GOOGLE_SIGN_IN_API andPlus.API.
  • In the onStart method connect the client usingmGoogleApiClient.connect(GoogleApiClient.SIGN_IN_MODE_OPTIONAL) and in onStop method disconnect the client. (You may do this in onResume and onPause methods also).
  • After the client is connected, first attempt the silentSignIn and if it fails with code SIGN_IN_REQUIRED, attempt to do a fresh sign in for the user.
  • After the sign in is completed, then you can invoke the Plus.PeopleApi with user accountId to obtain users google profile information.
  • To Sign out the user use Auth.GoogleSignInApi.signOut method and to revoke access useAuth.GoogleSignInApi.revokeAccess method.
  • Remove the android.permission.GET_ACCOUNTS permission from android manifest.

Here's the relevant code.

  protected void onCreate(Bundle savedInstanceState) {
  //Here unlike Google plus sign in the serverToken is just the clientId of the web application
  GoogleSignInOptions gso = new GoogleSignInOptions.Builder(GoogleSignInOptions.DEFAULT_SIGN_IN)
                  requestScopes(Plus.SCOPE_PLUS_LOGIN, Plus.SCOPE_PLUS_PROFILE)
  mGoogleApiClient = new GoogleApiClient.Builder(this)
                  addApi(Auth.GOOGLE_SIGN_IN_API, gso).

  protected void onResume() {


      //Here isConnected is just a flag that checks whether user is connected to internet.
      /**To avoid execution of this block you can check whether user previously signedIn on this device by storing a userIdToken and checking whether user needs to be signedIn automatically or not. */
      if (!mGoogleApiClient.isConnecting() && !mGoogleApiClient.isConnected() && isConnected) {
          } //Here isSignedIn is a boolean flag that tracks whether the user is signedIn or not.
          else if(isConnected&&mGoogleApiClient.isConnected()&&!isSignedIn){

  private void signInUsingNewAPI() {
    if (!isSignedIn&&isConnected) {


  private void attemptSilentSignIn(){
      OptionalPendingResult<GoogleSignInResult> opr = Auth.GoogleSignInApi.silentSignIn(mGoogleApiClient);
      if (opr.isDone()) {
          // If the user's cached credentials are valid, the OptionalPendingResult will be "done"
          // and the GoogleSignInResult will be available instantly.
          Log.d(TAG, "Got cached sign-in");
          GoogleSignInResult result = opr.get();
      } else {
          // If the user has not previously signed in on this device or the sign-in has expired,
          // this asynchronous branch will attempt to sign in the user silently.  Cross-device
          // single sign-on will occur in this branch.
          opr.setResultCallback(new ResultCallback<GoogleSignInResult>() {
              public void onResult(GoogleSignInResult googleSignInResult) {

  private void handleSignInResult(GoogleSignInResult result){
    if (!result.getStatus().isSuccess()) {
          isSignedIn = false;
          mIntentInProgress = false;
          if(result.getStatus().hasResolution()||result.getStatus().getStatusCode()== CommonStatusCodes.SIGN_IN_REQUIRED){
                freshSignIn(); //Rather than using startResolutionForResult, we invoke our method which attempts to do a fresh sign in and if there is error it is handled in onActivityResult method.

      else {
        mIntentInProgress = false;
        isSignedIn = true;
        final GoogleSignInAccount account = result.getSignInAccount();
        //Maybe save this result.
        SharedPreferences.Editor editor = preferences.edit();
        editor.putString("client_id_token", account.getIdToken());
        //You can pass these credentials to your server from here.
        //Invoke the GPlus People API
        Plus.PeopleApi.load(mGoogleApiClient, account.getId()).setResultCallback(new ResultCallback<People.LoadPeopleResult>() {
                  public void onResult(@NonNull People.LoadPeopleResult loadPeopleResult) {
                      Person person = loadPeopleResult.getPersonBuffer().get(0);
                      //Method that obtains the userInfo
                      getProfileInfo(person, account.getEmail());


  private void freshSignIn(){
      Intent signInIntent = Auth.GoogleSignInApi.getSignInIntent(mGoogleApiClient);
      startActivityForResult(signInIntent, RC_SIGN_IN);
  protected void onActivityResult(int requestCode, int responseCode,
                                  Intent intent) {

          mIntentInProgress = false;
          if (responseCode != RESULT_OK) {
             isSignedIn = false;
             //Maybe show a dialog to user ?
        //Attemp connection again.
        if (!mGoogleApiClient.isConnecting()) {
        else if (requestCode == RC_SIGN_IN) {
          GoogleSignInResult result = Auth.GoogleSignInApi.getSignInResultFromIntent(intent);
  // Connection callbacks
  public void onConnected(Bundle bundle) {
      if(!isSignedIn) {

  public void onConnectionSuspended(int i) {
    isSignedIn = false;
    if (!isSignedIn&&isConnected) {
  public void onConnectionFailed(ConnectionResult result) {

      if (!result.hasResolution()) {
                  this, result.getErrorCode(), RC_SIGN_IN).show();

      if (!mIntentInProgress) {
          // Store the ConnectionResult for later usage
          mConnectionResult = result;

          if (!isSignedIn) {
              // The user has already clicked 'sign-in' so we attempt to
              // resolve all
              // errors until the user is signed in, or they cancel.
 * Method to resolve any signin errors
private void resolveSignInError() {
    if (mConnectionResult.hasResolution()) {
        try {
            mIntentInProgress = true;
            mConnectionResult.startResolutionForResult(this, RC_RESOLVE_ERROR);
        } catch (IntentSender.SendIntentException e) {
            mIntentInProgress = false;

//Sign out and revoke methods
  * Sign-out from google
 public void signOutFromGoogle() {

     if (mGoogleApiClient.isConnected()) {
                 new ResultCallback<Status>() {
                     public void onResult(Status status) {
                       isSignedIn = false;
                       //do other stuff here.
                       //Builds a fresh instance of GoogleApiClient

   * Revoking access from google
  public void revokeGplusAccess() {
      if (mGoogleApiClient.isConnected()) {
                  new ResultCallback<Status>() {
                      public void onResult(Status status) {
                        isSignedIn = false;
                        //do other stuff here.
                        //Builds a fresh instance of GoogleApiClient
                        //You can inform your server of this change

 //Other utility methods
 private void hideProgressDialog() {
     if (mProgressDialog != null && mProgressDialog.isShowing()) {

 private void showProgressDialog() {
  if (mProgressDialog == null) {
      mProgressDialog = new ProgressDialog(this);
      mProgressDialog.setMessage("Signing In");

Hope this code is helpful in helping you move to the new Google sign in implementation.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

Hive or Impala ?

Hive and Impala both support SQL operation, but the performance of Impala is far superior than that ofHive. Although now with Spark SQL engine and use of HiveContext the performance of hive queries is also significantly fast, impala still has a better performance. The reason that impala has better performance is that it already has daemons running on the worker nodes and thus it avoids the overhead that is incurred during the creation of map and reduce jobs.
The query that I will mention later ran almost 10X faster on impala than on Hive (61 seconds vs around 600 seconds): Impala is known to give even better performance.

Schema on read vs Schema on write

Schema on read differs from schema on write as data is not validated till it is read. Although schema on read offers flexibility of defining multiple schemas for the same data, it can cause nasty runtime errors. As an example Hive and Impala are very particular about the timestamp format that they recognize and support, one workaround to avoid such bad records is to use a trick where rather than specifying the data type as timestamp, you specify the datatype as String and then use the cast operator to transform the records to timestamp format, this way bad records are skipped and the query does not error out.

cast(field_name as timestamp)

Window Functions, Top-N Queries, PL/SQL

Hive and Impala do not support update queries, but they do support select * from insert intooperation. Hive and impala also support window functions. The latter makes life easier because both Impala and Hive do not support PL/SQL procedures.
In the example below, I am using the dataset of NYC Yellow Taxi from the month of January 2015. The query below filters out invalid timestamp records and selects first 500 records per hour for 1st january 2015.

/**Top-N Subquery selects first 500 records per hour for a day*/
insert into nyc_taxi_data_limited  select VendorID, tpep_pickup_datetime , tpep_dropoff_datetime , passenger_count ,trip_distance ,pickup_longitude ,pickup_latitude,RateCodeID ,store_and_fwd_flag  ,dropoff_longitude ,dropoff_latitude ,payment_type ,fare_amount ,extra,mta_tax ,tip_amount,tolls_amount,improvement_surcharge,total_amount from ( select *,
row_number() over (partition by trunc(cast(tpep_pickup_datetime as timestamp), 'HH') order by trunc(cast(tpep_pickup_datetime as timestamp), 'HH') desc)
as rownumb from nyc_taxi_data where cast(tpep_pickup_datetime as timestamp) between cast('2015-01-01 00:00:00' as timestamp) and cast('2015-01-01 23:59:59' as timestamp)
) as q  where rownumb<=500;

Note the use of window function row_number and ordering by truncated timestamp, and cast operator to avoid invalid records.

What's the catch ?

Given the benefits of Impala why would one ever use Hive ? The answer lies in the fact that impala queries are not fault tolerant.


Although, Impala and Hive do not offer entire repertoire of functionality supported by traditional RDBMS's, they are closest wrt to functionality offered by traditional RDBMS's in the world of distributed systems and offer scalable and large scale data analysis capability.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

The Idea

Java 8 introduced functional programming support, this is a powerful feature which was missing from earlier versions. One of the benefits of functional programming is that it can be used to implement decorator pattern without the use of inheritance. One common requirement is to implement some kind of rate limiting for web services. Now, ideally you would want separation of concerns between the actual business logic and rate limitation logic. With Java 8, we can use function references to implement this separation of concerns and implement the decorator pattern.

The code

The code fragment below shows the implementation of the pattern. It is an example of integration with the Lyft API. The full source code is available here.

 * Generic method which can invoke any function without applying rate limit
 * @param method    the function to invoke or apply the each map input to
 * @param inputList The list of Maps, each of which contains the key value pair of service parameters
 * @param <R>       Generic Return object type in the list
 * @param <K>       Type of Key in Map
 * @param <V>       Type of Value in Map
 * @return A list with object type <V>
private <R, K, V> List<R> invokeWithoutRateLimit(Function<Map, R> method, List<Map<K, V>> inputList) {
    List<R> returnList = new ArrayList<>();
    inputList.stream().forEach(m -> {
    return returnList;

 * Generic method which can invoke any function with applying rate limit
 * It uses RxJava and Blocking invocation
 * @param method    the function to invoke or apply the each map input to
 * @param inputList The list of Maps, each of which contains the key value pair of service parameters
 * @param <R>       Generic Return object type in the list
 * @param <K>       Type of Key in Map
 * @param <V>       Type of Value in Map
 * @return A list with object type <V>
private <R, K, V> List<R> invokeWithRateLimit(Function<Map, R> method, List<Map<K, V>> inputList) {
    List<R> returnList = new ArrayList<>();
            Observable.interval(RATE_LIMIT, TimeUnit.SECONDS), (obs, timer) -> obs)
            .doOnNext(item -> {
                        R result = method.apply(item);
    return returnList;

 * This method accepts a list of coordinates and returns the estimated
 * fare for different lyft rides
 * @param costRequestList     The list of coordinates
 * @param invokeWithRateLimit Apply Rate limiting
 * @return A list of Prices per request
public List<CostEstimates> getCostEstimates(List<Map<String, Float>> costRequestList, boolean invokeWithRateLimit) {
    if (invokeWithRateLimit) {
        return invokeWithRateLimit(this::getCostEstimate, costRequestList);
    } else {
        return invokeWithoutRateLimit(this::getCostEstimate, costRequestList);

The highlighted code above shows how to pass method reference to methods invokeWithRateLimit() andinvokeWithoutRateLimit(), each of these methods then adds some custom preprocessing logic (like rate limitation using RxJava) after which it invokes the supplied method by using the apply() method. This implementation of the decorator pattern is much easier to grasp, than going via the inheritance route.
You can use the following link to view the entire code on Github repository.
Lyft-Client on Github.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

You might run into a scenario where you might require conditional authentication with Retrofit 2.0.
This post provides an example of integration with the Lyft API. In case of the Lyft API, first we need to authenticate with and query the oauth/token endpoint to obtain the OAUTH token, and then use thisaccessToken in other service calls. Also, such access tokens have an expiry time(1 hour), so ideally there should be a mechanism to handle this scenario.
One lazy (tends out to be perfect) solution is to use interceptors and compare the HTTP Response codefrom the service to see whether the code is 401. If the code is 401, you can assume that the token has either expired or was never obtained initially, either way you would need to re-authenticate and query the endpoint to obtain the accessToken.
The code block below shows how this is done. To access the entire source code you can visit Lyft-Client on Github.

 * This method initializes the retrofit clients
 * a) One for the initial authentication end point
 * b) Other for other service requests
private void initializeRetrofitClients() {
    OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
    OkHttpClient clientNormal;
    OkHttpClient clientAuthenticated;
    builder.interceptors().add(new Interceptor() {
        public okhttp3.Response intercept(Chain chain) throws IOException {
            Request originalRequest = chain.request();
            Request.Builder builder = originalRequest.newBuilder().header("Authorization:Bearer ", accessToken).
                    method(originalRequest.method(), originalRequest.body());
            okhttp3.Response response = chain.proceed(builder.build());
            implies that the token has expired
            or was never initialized
            if (response.code() == 401) {
                tokenExpired = true;
                logger.info("Token Expired");
                builder = originalRequest.newBuilder().header("Authorization:Bearer ", accessToken).
                        method(originalRequest.method(), originalRequest.body());
                response = chain.proceed(builder.build());
            return response;
    clientAuthenticated = builder.build();
    retrofitAuthenticated = new Retrofit.Builder().client(clientAuthenticated)
    OkHttpClient.Builder builder1 = new OkHttpClient().newBuilder();
    builder1.authenticator(new Authenticator() {
        public Request authenticate(Route route, okhttp3.Response response) throws IOException {
            String authentication = Credentials.basic(CLIENT_ID, CLIENT_SECRET);
            Request.Builder builder = response.request().newBuilder().addHeader("Authorization", authentication);
            return builder.build();
    clientNormal = builder1.build();
    retrofit = new Retrofit.Builder().client(clientNormal).
 * Is invoked only when the access token is required
 * Or it expires
private void getAuthenticationToken() {
    LyftService lyftService = this.retrofit.create(LyftService.class);
    Call<OAuthResponse> authRequestCall = lyftService.getAccessToken(oAuthRequest);
    Response<OAuthResponse> response = null;
    try {
        response = authRequestCall.execute();
        if (response.isSuccessful()) {
            accessToken = response.body().getAccessToken();
    } catch (IOException e) {
        logger.error("Exception occurred due to ", e);

As can be seen in the above code example, we build two OkHttpClient objects, the clientNormal object is configured to use HTTP basic authentication and is used by retrofit object to query the getAccessTokenendpoint to obtain the access token, this accessToken is required by other Lyft service endpoints. TheclientAuthenticated object uses a interceptor to set the Authorization:Bearer header with the value of theaccessToken, which is required for all other service endpoints.
In the method initializeRetrofitClients it can be seen that initially, we just invoke the service endpoint by using the value of accessToken (by calling chain.proceed) and if we see that the response code is 401, we invoke getAuthenticationToken followed by another call to chain.proceed with the new value of accessToken. For subsequent calls, the interceptor will use the stored value of accessToken. This lazy invocation to obtain the access token is better because this way the logic for deciding when to obtain accessToken is not hardcoded. In addition, this keeps the code simple by avoiding unncessary checks.
Hope this post was helpful in clearing the use of interceptors for conditional authentication.
You can use the following link to view the entire code on Github repository.
Lyft-Client on Github.

Posted on Sunday, June 26, 2016 by Ramandeep Singh Nanda

Mar 11, 2016

Well blogger does not have support for latex and the windows live writer is being redeveloped. So, in the meanwhile, I have written a few posts on the pelican blog and thought, I might as well link to them here.

Why you should prefer to use the square root of Gini Index: This post examines the advantages of using the Gini Index as the criteria for building decision trees.

Do tweets have predictive power. This post examines whether tweets have an effect on opening weekend revenue of box office movies.

Posted on Friday, March 11, 2016 by Ramandeep Singh Nanda

Aug 5, 2015

Earlier, I had covered an example here, which showed how to dynamically create users and map the application roles to enterprise groups. In this post, the sample application is extended to show how you can query the application roles from the application stripe(application specific policies).
To query the application specific roles, you need to access the application's policy from policystore and then you can either directly invoke searchRoles(String roleName) or searchRoles(String attributeToSearchRolesBy,String attributeValue,String equalityOrInequalityFlag). The response from the method is a List<AppRoleEntry>. The snippet below shows the code for doing so. Note that although there is another much more flexible method to search across application stripes by using policyStore.getAppRoles(StoreAppRoleSearchQuery obj), It is not implemented for the embedded policy store and throws UnsupportedOperationException.
     * Given a application stripe name and rolename this can be used to search for a role name
     * This method performs a wildcard search also.
     * @param roleName the rolename to search
     * @param applicationStripeName the application stripe name 
    public List<AppRoleEntry> searchAppRoleInApplicationStripe(String roleName,
                                                               String applicationStripeName) {
        JpsContext ctxt = IdentityStoreConfigurator.jpsCtxt;

        PolicyStore ps = ctxt.getServiceInstance(PolicyStore.class);
        ApplicationPolicy policy;

        try {
            policy = ps.getApplicationPolicy(applicationStripeName);
            return policy.searchAppRoles(ApplicationRoleAttributes.NAME.toString(),roleName,false );

        } catch (PolicyStoreException e) {
            throw new RuntimeException(e);


    private static final class IdentityStoreConfigurator {
        private static final JpsContext jpsCtxt = initializeFactory();

        private static JpsContext initializeFactory() {
            String methodName =
            JpsContextFactory tempFactory;
            JpsContext jpsContext;
            try {
                tempFactory = JpsContextFactory.getContextFactory();
                jpsContext = tempFactory.getContext();
            } catch (JpsException e) {
                DemoJpsLogger.severe("Exception in " + methodName + " " +
                                     e.getMessage() + " ", e);
                throw new RuntimeException("Exception in " + methodName + " " +
                                           e.getMessage() + " ", e);
            return jpsContext;


You can also do various other operations on the policy store and alter the application specific policies, once you have access to those operations. A key thing to note is these operations require specific PolicyStoreAccessPermissions to be granted in the jazn-data.xml. The steps to do so are mentioned below.

  1. Define a resource type of permission class PolicyStoreAccessPermission and the neccessary actions that you want to grant access to (In this example,I am granting access to all operations, signified by *). The snippet is shown below:-


  2. Next, create resources that are to be granted permissions. In this case, I have created two of them, the first one is the superset that allows access to all the application stripes and the next one grants access to only this applications's stripe. The snippet is shown below:-

            <name>context=APPLICATION, name=*</name>

  3. In the last step you have to assign these resources to the application roles or groups.

custom_permission application_stripe_search

The enterprise identity store provider being used here is the embedded weblogic ldap, to run the application properly you will need to configure a password for it in weblogic and set the password in jps-config.xml as shown in the screenshot below.


To run the application, the username/password combination is john/oracle123. To view the search roles screen, either run the SearchRoles.jspx or click on the Search Roles link in the left navigation bar. The link to download the application is mentioned below:-

Download the application.

Posted on Wednesday, August 05, 2015 by Ramandeep Singh Nanda