Wednesday, October 25, 2017

Run shell script using Oozie job

In this I will show how to automate shell scripts using Oozie framework.

First create job.properties file.

1
2
3
4
5
6
nameNode=hdfs://localhost:8020
jobTracker=localhost:8050
queueName=default
oozie.wf.application.path=/<Hdfs Path>/sampleworkflow.xml
oozie.use.system.libpath=true
oozie.libpath=<shared lib path on Hdfs>

Create sampleworkflow.xml and place it in Hdfs.
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<workflow-app name="shell-wf" xmlns="uri:oozie:workflow:0.4">
 <start to="shell-node"/>
 <action name="shell-node">
 <shell xmlns="uri:oozie:shell-action:0.2">
 <job-tracker>${jobTracker}</job-tracker>
 <name-node>${nameNode}</name-node>
 <configuration>
    <property>
    <name>mapred.job.queue.name</name>
    <value>${queueName}</value>
    </property>
  </configuration>
 <exec>sample.sh</exec>
 <file>/<Hdfs Path of the shell script>/sample.sh</file>
 </shell>
 <ok to="end"/>
 <error to="fail"/>
 </action>
 <kill name="fail">
 <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
 </kill>
 <end name="end"/>
</workflow-app>


Running the job:
1
oozie job -oozie http://localhost:11000/oozie -config job.properties -run


Java Client for Secure Web Socket

This post is how to connect a websocket which is secured by Basic Authentication.

I will be using javax.websocket-api jar file.

Create WebContainer object

WebSocketContainer container = ContainerProvider.getWebSocketContainer();

using the container connect to the service

1
container.connectToServer(endpoint, clientConfig, new URI("wss://hostname:port/demo"));


credentials are passed to clientConfig object.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ClientEndpointConfig.Configurator configurator = new ClientEndpointConfig.Configurator() {
             public void beforeRequest(Map<String, List<String>> headers) {
          String credentials = "username:password";
          headers.put("Authorization", Arrays.asList("Basic " + new BASE64Encoder().encode(credentials.getBytes())));
                 System.out.println("Header set successfully");
             }
         };

         ClientEndpointConfig clientConfig = ClientEndpointConfig.Builder.create()
                 .configurator(configurator)
                 .build();


endpoint is the callback handler. Once the session is established with the service then we pass a message using the session object. onMessage() method should be overridden to receive the message.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
Endpoint endpoint = new Endpoint() {
  @Override
                public void onOpen(Session session, EndpointConfig config) {
                    session.addMessageHandler(new MessageHandler.Whole<String>() {
                        @Override
                        public void onMessage(String content) {
                            System.out.println("Received message: "+content);
                        }
                    });
                    try {
                 System.out.println("Sending message to endpoint: " + msg);
                        System.out.println("Session Id:: "+session.getId());
   session.getBasicRemote().sendText(msg);
      } catch (Exception e) {
   e.printStackTrace();
      }
                }
            };

Thursday, October 19, 2017

Connect to Cassandra from RStudio

In order to connect to cassandra from rstudio RJDBC library would help.

Prerequisites:
Install RJDBC library
1
install.packages("RJDBC")


Place cassandra-jdbc.jar library in the cassandra libraries folder. In my environment it's placed under the path '/usr/share/dse/cassandra/lib/'

Make sure thrift protocol is enabled on the cassandra cluster
1
nodetool statusthrift


If thrift is disabled it can be enabled with following command

1
nodetool enablethrift


R Script:

1
2
3
4
library(RJDBC)
cassdrv <- JDBC("org.apache.cassandra.cql.jdbc.CassandraDriver",list.files("/usr/share/dse/cassandra/lib/",pattern="jar$",full.names=T))
casscon <- dbConnect(cassdrv, "jdbc:cassandra://localhost:9160/test")
res <- dbGetQuery(casscon, "select * from emp")

Tuesday, October 17, 2017

Java Custom Accumulators implementation to collect bad records

Accumulators:

During transformations in spark we often encounter a problem where we can use the variables defined outside the function that we pass to map() or filter but cannot pass the data from the function back to driver. Accumulators which is a shared variable in the spark cluster solve this problem.

Accumulators work as follows:
  • We create them in the driver by calling the SparkContext.accumulator(initial Value) method, which produces an accumulator holding an initial value. The return type is an org.apache.spark.Accumulator[T] object, where T is the type of initialValue.
  • Worker code in Spark closures can add to the accumulator with its += method (or add in Java).
  • The driver program can call the value property on the accumulator to access its value (or call value() and setValue() in Java).

Spark’s built-in accumulator types: integers (Accumulator[Int]) with addition. Out of the box, Spark supports accumulators of type Double, Long, and Float. In addition to these, Spark also includes an API to define custom accumulator types.

Custom accumulators need to extend AccumulatorParam, which is covered in the Spark API documentation. Beyond adding to a numeric value, we can use any operation for add, provided
that operation is commutative and associative.

I have a requirement where I have data in a file I want to collect the records who length is greater than characters:

First I create custom accumulator implementing org.apache.spark.AccumulatorParam

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RecordAccmulator implements AccumulatorParam<Map<String, String>> {
    private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> addInPlace(Map<String, String> arg0, Map<String, String> arg1) {
    Map<String, String> map = new HashMap<>();
    map.putAll(arg0);
    map.putAll(arg1);
    return map;
  }

  @Override
  public Map<String, String> zero(Map<String, String> arg0) {

    return new HashMap<>();
  }

  @Override
  public Map<String, String> addAccumulator(Map<String, String> arg0, Map<String, String> arg1) {

    return addInPlace(arg0, arg1);
  }
}


I use the custom accumulator in my business class:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
JavaSparkContext sc = SparkUtils.createSparkContext(MyTest.class.getName(), "local[*]");
 SQLContext hiveContext = SparkUtils.getSQLContext(sc);
 
 JavaRDD<String> file = sc.textFile("inputfile.txt");
 logger.info("File Record Count:: "+file.count());
 
 
    Accumulator<Map<String, String>> accm = sc.accumulator(new HashMap<>(), new RecordAccmulator());
 
 JavaPairRDD<String, String> filePair = file.mapToPair(new PairFunction<String, String, String>(  ) {

    private static final long serialVersionUID = 1L;

    @Override
    public Tuple2<String, String> call(String t) throws Exception {
        String[] str = StringUtils.split(t,":");
      
      if(str[1].length()>10){
        Map map = new HashMap<>();
        map.put(str[0], str[1]);
        accm.add(map);
      }
      return new Tuple2<String, String>(str[0], str[1]);
    }});
 
 logger.info("Pair Count:: "+filePair.count());
 logger.info("Accumulator Values:: "+accm.value());