Proverbial Count

Proverbial count example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class App
{
    public static void main( String[] args ) {
    // Local mode
    SparkConf sparkConf = new SparkConf().setAppName("HelloWorld").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
   
    JavaRDD<String> rdd = ctx.textFile("/home/Development/MarsWorkspace/third/resources/Forgiveness1.rtf");
    JavaRDD<String> words = rdd.flatMap(line -> Arrays.asList(line.split(" ")));
   
    JavaPairRDD<String, Integer> counts = words.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
               .reduceByKey((x, y) -> x + y);
    counts.saveAsTextFile("/home/Development/MarsWorkspace/third/resources/out.txt");
    }
}

hadoop setup

Step 1: Install Java or validate Java install

Step 2: Create user and group in the system

1
2
3
4
5
6
7
8
9
10
sudo addgroup hadoop
[sudo] password for xxx:
Adding group `hadoop' (GID 1001) ...
Done.

sudo adduser --ingroup hadoop hduser
Adding user `hduser'
...
Adding new user `hduser' (1001) with group `hadoop' ...
Creating home directory `/home/hduser' ...
Copying files from `/etc/skel'
...

Step 3. Validate if SSH installed (which ssh, which sshd), if not install it (sudo apt-get install ssh)

Step 4: Create SSH Certificates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
su - hduser
Password:
hduser@@@@-Lenovo-Z50-70:~$ ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa):
Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub.
The key fingerprint is:
8c:0d:ca:68:6e:f0:91:f5:ba:f0:a3:d9:a5:b0:7f:71 hduser@@@@-Lenovo-Z50-70
The key's randomart image is:
+---[RSA 2048]----+
|                 |
|                 |
|    . .          |
|   = o =         |
|. = o o S        |
| = . .. E        |
|  * . .o         |
| . B.+.          |
|  +o*o           |
+-----------------+

$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

Step 5: Check if the ssh works.

1
2
3
4
5
6
$ ssh localhost
The authenticity of host 'localhost (127.0.0.1)' can't be established.
ECDSA key fingerprint is 91:51:2b:33:cd:bc:65:45:ca:4c:e2:51:9d:1e:93:f2.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '
localhost' (ECDSA) to the list of known hosts.
Welcome to Ubuntu 15.04 (GNU/Linux 3.16.0-55-generic x86_64)

Step 6: Get binaries from Hadoop site (as of 02/19, current version is 2.7, but I got 2.6 for other reasons) and go through the process of setting up

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ cd /usr/local
$ sudo tar xzf hadoop-2.6.4.tar.gz
$ sudo mv hadoop-2.6.4 hadoop
$ sudo chown -R hduser:hadoop hadoop

vi .bashrc
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"


$ which javac
/usr/bin/javac

 readlink -f /usr/bin/javac
/usr/lib/jvm/java-8-oracle/bin/javac

$ vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh
Update JAVA_HOME
# The java implementation to use.
export JAVA_HOME=/usr/lib/jvm/java-8-oracle

$ sudo mkdir -p /app/hadoop/tmp
$ sudo chown hduser:hadoop /app/hadoop/tmp

Next Update core-site.xml
The core-site.xml file informs Hadoop daemon where NameNode runs in the cluster. It contains the configuration settings for Hadoop Core such as I/O settings that are common to HDFS and MapReduce.

There’re three HDFS properties which contain hadoop.tmp.dir in their values

1. dfs.name.dir: directory where namenode stores its metadata, with default value ${hadoop.tmp.dir}/dfs/name.
2. dfs.data.dir: directory where HDFS data blocks are stored, with default value ${hadoop.tmp.dir}/dfs/data.
3. fs.checkpoint.dir: directory where secondary namenode store its checkpoints, default value is ${hadoop.tmp.dir}/dfs/namesecondary.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<configuration>
   <property>
       <name>hadoop.tmp.dir</name>
       <value>/app/hadoop/tmp</value>
       <description>A base for other temporary directories.</description>
    </property>

    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:54310</value>
        <description>The name of the default file system.  A URI whose
        scheme and authority determine the FileSystem implementation.  The
        uri's scheme determines the config property (fs.SCHEME.impl) naming
        the FileSystem implementation class.  The uri's authority is used to
        determine the host, port, etc. for a filesystem.</description>
    </property>
</configuration>

Next need to edit the mapred-site.xml, which is used to specify which framework is being used for MapReduce. Edit this XML file to add the following

1
2
3
4
5
6
7
8
9
10
<configuration>
 <property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
 </property>
</configuration>

Next to edit the hdfs-site.xml to specify the namenode and the datanode. Create two directories which will contain the namenode and the datanode for this Hadoop installation.

1
2
3
4
$ sudo mkdir -p /usr/local/hadoop_store/hdfs/namenode
@@@-Lenovo-Z50-70:~$ sudo mkdir -p /usr/local/hadoop_store/hdfs/datanode
@@@-Lenovo-Z50-70:~$ sudo chown -R hduser:hadoop /usr/local/hadoop_store
$ vi /usr/local/hadoop/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<configuration>
 <property>
  <name>dfs.replication</name>
  <value>1</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
 </property>
 <property>
   <name>dfs.namenode.name.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/namenode</value>
 </property>
 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/datanode</value>
 </property>
</configuration>

Next step, format filesystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
hduser@@@@-Lenovo-Z50-70:~$ hadoop namenode -format
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

16/02/21 09:47:52 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = @@@-Lenovo-Z50-70/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 2.6.4
STARTUP_MSG:   classpath = ...
STARTUP_MSG:   build = https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6; compiled by 'jenkins' on 2016-02-12T09:45Z
STARTUP_MSG:   java = 1.8.0_66
************************************************************/
16/02/21 09:47:52 INFO namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
16/02/21 09:47:52 INFO namenode.NameNode: createNameNode [-format]
16/02/21 09:47:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Formatting using clusterid: CID-4ad4548d-6cb6-492c-9d38-ee5f665143c2
16/02/21 09:47:53 INFO namenode.FSNamesystem: No KeyProvider found.
16/02/21 09:47:53 INFO namenode.FSNamesystem: fsLock is fair:true
16/02/21 09:47:53 INFO blockmanagement.DatanodeManager: dfs.block.invalidate.limit=1000
16/02/21 09:47:53 INFO blockmanagement.DatanodeManager: dfs.namenode.datanode.registration.ip-hostname-check=true
16/02/21 09:47:53 INFO blockmanagement.BlockManager: dfs.namenode.startup.delay.block.deletion.sec is set to 000:00:00:00.000
16/02/21 09:47:53 INFO blockmanagement.BlockManager: The block deletion will start around 2016 Feb 21 09:47:53
16/02/21 09:47:53 INFO util.GSet: Computing capacity for map BlocksMap
16/02/21 09:47:53 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:53 INFO util.GSet: 2.0% max memory 889 MB = 17.8 MB
16/02/21 09:47:53 INFO util.GSet: capacity      = 2^21 = 2097152 entries
16/02/21 09:47:53 INFO blockmanagement.BlockManager: dfs.block.access.token.enable=false
16/02/21 09:47:53 INFO blockmanagement.BlockManager: defaultReplication         = 1
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxReplication             = 512
16/02/21 09:47:53 INFO blockmanagement.BlockManager: minReplication             = 1
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxReplicationStreams      = 2
16/02/21 09:47:53 INFO blockmanagement.BlockManager: replicationRecheckInterval = 3000
16/02/21 09:47:53 INFO blockmanagement.BlockManager: encryptDataTransfer        = false
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxNumBlocksToLog          = 1000
16/02/21 09:47:54 INFO namenode.FSNamesystem: fsOwner             = hduser (auth:SIMPLE)
16/02/21 09:47:54 INFO namenode.FSNamesystem: supergroup          = supergroup
16/02/21 09:47:54 INFO namenode.FSNamesystem: isPermissionEnabled = true
16/02/21 09:47:54 INFO namenode.FSNamesystem: HA Enabled: false
16/02/21 09:47:54 INFO namenode.FSNamesystem: Append Enabled: true
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map INodeMap
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 1.0% max memory 889 MB = 8.9 MB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^20 = 1048576 entries
16/02/21 09:47:54 INFO namenode.NameNode: Caching file names occuring more than 10 times
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map cachedBlocks
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 0.25% max memory 889 MB = 2.2 MB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^18 = 262144 entries
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.threshold-pct = 0.9990000128746033
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.min.datanodes = 0
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.extension     = 30000
16/02/21 09:47:54 INFO namenode.FSNamesystem: Retry cache on namenode is enabled
16/02/21 09:47:54 INFO namenode.FSNamesystem: Retry cache will use 0.03 of total heap and retry cache entry expiry time is 600000 millis
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map NameNodeRetryCache
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 0.029999999329447746% max memory 889 MB = 273.1 KB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^15 = 32768 entries
16/02/21 09:47:54 INFO namenode.NNConf: ACLs enabled? false
16/02/21 09:47:54 INFO namenode.NNConf: XAttrs enabled? true
16/02/21 09:47:54 INFO namenode.NNConf: Maximum size of an xattr: 16384
16/02/21 09:47:54 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1807438273-127.0.1.1-1456069674855
16/02/21 09:47:55 INFO common.Storage: Storage directory /usr/local/hadoop_store/hdfs/namenode has been successfully formatted.
16/02/21 09:47:55 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
16/02/21 09:47:55 INFO util.ExitUtil: Exiting with status 0
16/02/21 09:47:55 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at @@@-Lenovo-Z50-70/127.0.1.1
************************************************************/
hduser@@@@-Lenovo-Z50-70:~$

Next starting hadoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hduser@@@@-Lenovo-Z50-70:/usr/local/hadoop/sbin$ ./start-all.sh
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
16/02/21 10:23:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hduser-namenode-@@@-Lenovo-Z50-70.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-@@@-Lenovo-Z50-70.out
Starting secondary namenodes [0.0.0.0]
The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established.
ECDSA key fingerprint is 91:51:2b:33:cd:bc:65:45:ca:4c:e2:51:9d:1e:93:f2.
Are you sure you want to continue connecting (yes/no)? yes
0.0.0.0: Warning: Permanently added '
0.0.0.0' (ECDSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-hduser-secondarynamenode-@@@-Lenovo-Z50-70.out
16/02/21 10:23:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hduser-resourcemanager-@@@-Lenovo-Z50-70.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-@@@-Lenovo-Z50-70.out

Check for running status

1
2
3
4
5
6
7
hduser@@@@-Lenovo-Z50-70:/usr/local/hadoop/sbin$ jps
6624 NodeManager
6016 NameNode
6162 DataNode
6503 ResourceManager
6345 SecondaryNameNode
6941 Jps

Access the web interface through — http://localhost:50070

Scala Hello World in Scala IDE

After downloading ScalaIDE and after installing https://github.com/sonatype/m2eclipse-scala, still ran into the issue of not finding the right arch-type. Solved by adding a remote catalog: http://repo1.maven.org/maven2/archetype-catalog.xml

Here is the POM XML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.vitarkah.scala</groupId>
  <artifactId>first</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2015</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.10.6</scala.version>
    <scala.compat.version>2.10</scala.compat.version>
  </properties>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalactic</groupId>
      <artifactId>scalactic_2.10</artifactId>
      <version>2.2.6</version>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_2.10</artifactId>
      <version>2.2.6</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <!-- mixed scala/java compile -->
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
      <!-- for fatjar -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>assemble-all</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
    <pluginManagement>
      <plugins>
        <!--This plugin's configuration is used to store Eclipse m2e settings
          only. It has no influence on the Maven build itself. -->
        <plugin>
          <groupId>org.eclipse.m2e</groupId>
          <artifactId>lifecycle-mapping</artifactId>
          <version>1.0.0</version>
          <configuration>
            <lifecycleMappingMetadata>
              <pluginExecutions>
                <pluginExecution>
                  <pluginExecutionFilter>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>
                      maven-scala-plugin
                    </artifactId>
                    <versionRange>
                      [2.15.2,)
                    </versionRange>
                    <goals>
                      <goal>compile</goal>
                      <goal>testCompile</goal>
                    </goals>
                  </pluginExecutionFilter>
                  <action>
                    <execute></execute>
                  </action>
                </pluginExecution>
              </pluginExecutions>
            </lifecycleMappingMetadata>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

</project>

Here is the “hello world” Scala app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.vitarkah.scala.first
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/**
 * @author ${user.name}
 */

object App {
 
def main(args: Array[String]) {

    // initialise spark context
    val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
    val sc = new SparkContext(conf)
   
    // do stuff
    println("Hello, world!")
   
    // terminate spark context
    sc.stop()
   
  }

}

Here is the output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/14 21:04:59 INFO SparkContext: Running Spark version 1.6.0
16/02/14 21:04:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/14 21:05:00 WARN Utils: Your hostname, vichu-Lenovo-Z50-70 resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/14 21:05:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/14 21:05:00 INFO SecurityManager: Changing view acls to: vichu
16/02/14 21:05:00 INFO SecurityManager: Changing modify acls to: vichu
16/02/14 21:05:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vichu); users with modify permissions: Set(vichu)
16/02/14 21:05:00 INFO Utils: Successfully started service '
sparkDriver' on port 51488.
16/02/14 21:05:00 INFO Slf4jLogger: Slf4jLogger started
16/02/14 21:05:01 INFO Remoting: Starting remoting
16/02/14 21:05:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.140:38851]
16/02/14 21:05:01 INFO Utils: Successfully started service '
sparkDriverActorSystem' on port 38851.
16/02/14 21:05:01 INFO SparkEnv: Registering MapOutputTracker
16/02/14 21:05:01 INFO SparkEnv: Registering BlockManagerMaster
16/02/14 21:05:01 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-37186a0b-a60a-41d7-8561-fd4054ef67f7
16/02/14 21:05:01 INFO MemoryStore: MemoryStore started with capacity 1088.6 MB
16/02/14 21:05:01 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/14 21:05:01 INFO Utils: Successfully started service '
SparkUI' on port 4040.
16/02/14 21:05:01 INFO SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/14 21:05:01 INFO Executor: Starting executor ID driver on host localhost
16/02/14 21:05:01 INFO Utils: Successfully started service '
org.apache.spark.network.netty.NettyBlockTransferService' on port 50884.
16/02/14 21:05:01 INFO NettyBlockTransferService: Server created on 50884
16/02/14 21:05:01 INFO BlockManagerMaster: Trying to register BlockManager
16/02/14 21:05:01 INFO BlockManagerMasterEndpoint: Registering block manager localhost:50884 with 1088.6 MB RAM, BlockManagerId(driver, localhost, 50884)
16/02/14 21:05:01 INFO BlockManagerMaster: Registered BlockManager

Hello, world! <<--- There it is!


16/02/14 21:05:01 INFO SparkUI: Stopped Spark web UI at http://192.168.1.140:4040
16/02/14 21:05:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/02/14 21:05:01 INFO MemoryStore: MemoryStore cleared
16/02/14 21:05:01 INFO BlockManager: BlockManager stopped
16/02/14 21:05:01 INFO BlockManagerMaster: BlockManagerMaster stopped
16/02/14 21:05:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/02/14 21:05:01 INFO SparkContext: Successfully stopped SparkContext
16/02/14 21:05:01 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/02/14 21:05:01 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/02/14 21:05:01 INFO ShutdownHookManager: Shutdown hook called
16/02/14 21:05:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-44aa9e76-6aec-4162-8856-c1605c8f060c

Spark Java HelloWorld

1. First, download & install eclipse Mars for Ubuntu 15 (pretty staight forward from here)

2. Create an Maven Project in Eclipse. Straight forward

3. Adding Spark Depedency

1
2
3
4
5
6
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.1</version>
      <scope>provided</scope>
    </dependency>

4. Hello World Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

/**
 * Hello world!
 *
 */

public class App {
  public static void main(String[] args) {

    // Local mode
    SparkConf sparkConf = new SparkConf().setAppName("HelloWorld").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    String[] arr = new String[] { "A1", "B2", "C3", "D4", "F5" };
    List<String> inputList = Arrays.asList(arr);
    JavaRDD<String> inputRDD = ctx.parallelize(inputList);
    inputRDD.foreach(new VoidFunction<String>() {

      public void call(String input) throws Exception {
        System.out.println(input);

      }
    });

  }
}

5. Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
16/02/13 07:27:07 WARN util.Utils: Your hostname, vichu-Lenovo-Z50-70 resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/13 07:27:07 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/13 07:27:07 INFO spark.SecurityManager: Changing view acls to: vichu
16/02/13 07:27:07 INFO spark.SecurityManager: Changing modify acls to: vichu
16/02/13 07:27:07 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vichu); users with modify permissions: Set(vichu)
16/02/13 07:27:07 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/02/13 07:27:07 INFO Remoting: Starting remoting
16/02/13 07:27:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.140:39886]
16/02/13 07:27:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.1.140:39886]
16/02/13 07:27:07 INFO util.Utils: Successfully started service 'sparkDriver' on port 39886.
16/02/13 07:27:07 INFO spark.SparkEnv: Registering MapOutputTracker
16/02/13 07:27:07 INFO spark.SparkEnv: Registering BlockManagerMaster
16/02/13 07:27:07 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20160213072707-5f39
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 56037.
16/02/13 07:27:08 INFO network.ConnectionManager: Bound socket to port 56037 with id = ConnectionManagerId(192.168.1.140,56037)
16/02/13 07:27:08 INFO storage.MemoryStore: MemoryStore started with capacity 945.8 MB
16/02/13 07:27:08 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/02/13 07:27:08 INFO storage.BlockManagerMasterActor: Registering block manager 192.168.1.140:56037 with 945.8 MB RAM, BlockManagerId(<driver>, 192.168.1.140, 56037, 0)
16/02/13 07:27:08 INFO storage.BlockManagerMaster: Registered BlockManager
16/02/13 07:27:08 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e86643a4-7d63-4b1b-9b3c-95178861aa1e
16/02/13 07:27:08 INFO spark.HttpServer: Starting HTTP Server
16/02/13 07:27:08 INFO server.Server: jetty-8.1.14.v20131031
16/02/13 07:27:08 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44346
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'HTTP file server' on port 44346.
16/02/13 07:27:08 INFO server.Server: jetty-8.1.14.v20131031
16/02/13 07:27:08 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/02/13 07:27:08 INFO ui.SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/13 07:27:08 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.1.140:39886/user/HeartbeatReceiver
16/02/13 07:27:08 INFO spark.SparkContext: Starting job: foreach at App.java:24
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Got job 0 (foreach at App.java:24) with 1 output partitions (allowLocal=false)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Final stage: Stage 0(foreach at App.java:24)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Missing parents: List()
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at App.java:23), which has no missing parents
16/02/13 07:27:08 INFO storage.MemoryStore: ensureFreeSpace(1504) called with curMem=0, maxMem=991753666
16/02/13 07:27:08 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1504.0 B, free 945.8 MB)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at App.java:23)
16/02/13 07:27:08 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/02/13 07:27:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1224 bytes)
16/02/13 07:27:08 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
A1
B2
C3
D4
F5
16/02/13 07:27:08 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 585 bytes result sent to driver
16/02/13 07:27:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 39 ms on localhost (1/1)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Stage 0 (foreach at App.java:24) finished in 0.054 s
16/02/13 07:27:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/02/13 07:27:08 INFO spark.SparkContext: Job finished: foreach at App.java:24, took 0.240135089 s

Installing Apache Spark

First things first, got to have latest Java

1
2
3
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer

Next install Scala. The latest version as of today is 2.11.7, but I ran into brick wall with that version. So using 2.10.6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ wget http://www.scala-lang.org/files/archive/scala-2.10.6.deb
$ sudo dpkg -i scala-2.10.6.deb

$ scala
Welcome to Scala version 2.10.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_66).
Type in expressions to have them evaluated.
Type :help for more information.

scala> :q

$ sudo apt-get install git
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0.tgz
$ tar xvf spark-1.6.0.tgz
$ rm spark-1.6.0.tgz
$ cd spark-1.6.0/
$ build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.4 -DskipTests clean package

Notes:
:q is to quit from scala shell
Spark does not yet support its JDBC component for Scala 2.11.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
./run-example SparkPi
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/13 06:56:35 INFO SparkContext: Running Spark version 1.6.0
16/02/13 06:56:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/13 06:56:35 WARN Utils: Your hostname, .... resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/13 06:56:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/13 06:56:35 INFO SecurityManager: Changing view acls to: ...
16/02/13 06:56:35 INFO SecurityManager: Changing modify acls to: ....
16/02/13 06:56:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(...); users with modify permissions: Set(...)
16/02/13 06:56:36 INFO Utils: Successfully started service '
sparkDriver' on port 34966.
16/02/13 06:56:36 INFO Slf4jLogger: Slf4jLogger started
16/02/13 06:56:36 INFO Remoting: Starting remoting
16/02/13 06:56:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.140:49553]
16/02/13 06:56:36 INFO Utils: Successfully started service '
sparkDriverActorSystem' on port 49553.
16/02/13 06:56:36 INFO SparkEnv: Registering MapOutputTracker
16/02/13 06:56:36 INFO SparkEnv: Registering BlockManagerMaster
16/02/13 06:56:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1e81dc84-91ff-4503-ab3e-7fa8adbac78e
16/02/13 06:56:36 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/02/13 06:56:36 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/13 06:56:36 INFO Utils: Successfully started service '
SparkUI' on port 4040.
16/02/13 06:56:36 INFO SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/13 06:56:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/httpd-6774316e-0973-43c7-b58d-3cc0a1991f95
16/02/13 06:56:36 INFO HttpServer: Starting HTTP Server
16/02/13 06:56:36 INFO Utils: Successfully started service '
HTTP file server' on port 42625.
16/02/13 06:56:37 INFO SparkContext: Added JAR file:/home/.../spark/spark-1.6.0/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.6.4.jar at http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar with timestamp 1455368197141
16/02/13 06:56:37 INFO Executor: Starting executor ID driver on host localhost
16/02/13 06:56:37 INFO Utils: Successfully started service '
org.apache.spark.network.netty.NettyBlockTransferService' on port 58239.
16/02/13 06:56:37 INFO NettyBlockTransferService: Server created on 58239
16/02/13 06:56:37 INFO BlockManagerMaster: Trying to register BlockManager
16/02/13 06:56:37 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58239 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58239)
16/02/13 06:56:37 INFO BlockManagerMaster: Registered BlockManager
16/02/13 06:56:37 INFO SparkContext: Starting job: reduce at SparkPi.scala:36
16/02/13 06:56:37 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
16/02/13 06:56:37 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
16/02/13 06:56:37 INFO DAGScheduler: Parents of final stage: List()
16/02/13 06:56:37 INFO DAGScheduler: Missing parents: List()
16/02/13 06:56:37 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
16/02/13 06:56:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1888.0 B, free 1888.0 B)
16/02/13 06:56:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1202.0 B, free 3.0 KB)
16/02/13 06:56:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58239 (size: 1202.0 B, free: 511.1 MB)
16/02/13 06:56:38 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/02/13 06:56:38 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
16/02/13 06:56:38 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/02/13 06:56:38 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2156 bytes)
16/02/13 06:56:38 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2156 bytes)
16/02/13 06:56:38 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/02/13 06:56:38 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/02/13 06:56:38 INFO Executor: Fetching http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar with timestamp 1455368197141
16/02/13 06:56:38 INFO Utils: Fetching http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar to /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/userFiles-8a089d63-89cb-4bf4-a116-b1dcfb15e6c1/fetchFileTemp3377846363451941090.tmp
16/02/13 06:56:38 INFO Executor: Adding file:/tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/userFiles-8a089d63-89cb-4bf4-a116-b1dcfb15e6c1/spark-examples-1.6.0-hadoop2.6.4.jar to class loader
16/02/13 06:56:38 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1031 bytes result sent to driver
16/02/13 06:56:38 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1031 bytes result sent to driver
16/02/13 06:56:38 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 762 ms on localhost (1/2)
16/02/13 06:56:38 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 736 ms on localhost (2/2)
16/02/13 06:56:38 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/02/13 06:56:38 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) finished in 0.777 s
16/02/13 06:56:38 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 1.003757 s

Pi is roughly 3.13756 <<-- There it is!


16/02/13 06:56:38 INFO SparkUI: Stopped Spark web UI at http://192.168.1.140:4040
16/02/13 06:56:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/02/13 06:56:38 INFO MemoryStore: MemoryStore cleared
16/02/13 06:56:38 INFO BlockManager: BlockManager stopped
16/02/13 06:56:38 INFO BlockManagerMaster: BlockManagerMaster stopped
16/02/13 06:56:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/02/13 06:56:39 INFO SparkContext: Successfully stopped SparkContext
16/02/13 06:56:39 INFO ShutdownHookManager: Shutdown hook called
16/02/13 06:56:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f
16/02/13 06:56:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/httpd-6774316e-0973-43c7-b58d-3cc0a1991f95

Install SBT. The most current version is 0.13.9

1
2
3
4
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt