AWS Takeaways

Recently, here at Hyla, my team and I pushed forward and got the entire trade in and processing platform in AWS. Along the way we made few application related improvements to leverage the benefits from the target environment. A recent LinkedIn post resulted in few folks asking me about the learning. Thought, I will provide a brief write up on our experience pushing through this change. Hoping that this helps others who are on a similar journey

#1. Change buy in – Moving to any cloud environment from legacy environment may not be a trivial activity. Hence, it is important that the change is not treated as a hobby project, has support from the senior leadership, has acceptance from business that it is an important strategic initiative and is strategically aligned with the business goals, has recognition from corporate finance on CapEx to OpEx shift. With the above in place, on the execution side, it is important to ensure that there is excellent project management team, commitment from the execution staff and, most importantly, good old fashioned grit to see things through.

#2: Leading change – The mechanics of pushing through the change boils down to (1) leading the change from the front (2) influencing the team to push through the change (3) issuing top down edict to execute the change. However, any change implementation comes with a degree of Fear, Uncertainty and Doubt (FUD). The leadership stamina to see through the change is directly proportional to intensity of FUD. Accordingly, appropriate strategy needs to be applied to push through the change. Leading from the front involves getting dirt under the nails. So if the leader has ability and bandwidth to embrace this strategy, the probability of success is relatively high. However, if the change execution is by influence, or if it is being pushed top down, then the leader(s) must ensure that team is adequately staffed and trained to successfully manage through the change.

#3. Architecture – While it sounds trite, it is important to pen down a high level and next level architectural diagrams that includes the low level details like VPC, Subnets for various tiers, ACL for the subnets, Security groups for EC2 including ports. It is important to keep in mind various services that may be needed as well – for example – services like SSH (22), SMTP (25), Tomcat (8080) etc to design the architecture. Using the architecture as blueprint, cloud formation or other scripting, needs to be written to build the infrastructure.

#4. Application State – When porting over legacy applications this is one area that mostly likely is going to cause a lot of heartburn. The underlying issue is what Martin Fowler calls as “Snowflake Server”. This is where folks needs to spend energy to decouple application state from the environment. One of the long pole happens to be property files. The best way to tackle this would be pivot to something like Cloud Config or Zookeeper or Consul. However, due to timelines pressure, it may be hard to pivot, and in those cases S3 could be leveraged to store the application state and configuration files.

#5. AWS Accounts – Before building anything it is important to think through account and hierarchies. One could design a fine grained hierarchy or stay coarse grained, and the final design needs to be driven by department or company objectives. In our case, we just needed four separate accounts for each environment – prod, uat, qa and dev. However, in larger organization it may be a wise idea to put deeper thoughts into account organization. This enables the ability to get billing information by account (it is also possible to get billing information using tags, and hence the reason to think through before hand as to how it needs to be set up)

#6. VPC and CIDR Ranges – It is equally important to put thoughts on segregating CIDR ranges based on environment and business domains. In our case, we had to go through few iterations to pick the right CIDR range for dev, qa, uat and prod (the few iteration could have been avoided if time was spent early on)

#7. Building up infrastructure – Building, by hand, through console is great for learning. However, folks need to invest time and energy to build up the infrastructure using CloudFormation or TerraForm or CloudFormation Template Generator (from Monsanto Engineering). In our case, we ended using CloudFormation (after a very brief evaluation of the other two products). Once the scripts are in place, it is important to start treating these scripts as code, specifically, infrastructure as code. This idea need to get ingrained as part of the software organization culture that infrastructure is no different from the rest of the code base. In our case, the cloud formation scripts are in Git and, going forward, changes to environment will get no different treatment than changes to code supporting our product suites.

#5. Service Limits – It is a good idea to be aware of what the limits are and make requests for adds ahead of time. It may not be ideal when an application under load, trying to scale up, hits the limits and breaks down. That may not yield optimal experience would it?

#6. Accessing EC2 – If set up right, only few (very few) will be needing SSH access to EC2. In fact, in a well automated state, even SSH access may not be required. One of the reasons developers need access to EC2 instances is to view application logs. The logs, however, can be piped to CloudWatch Logs and if the IAM is set up correctly, this should address the need for accessing the logs for debugging purposes. Another strategy would be send all the log data to ElasticSearch, which is actually the most ideal solution. This would not only enable enhanced search capabilities, but also opens up opportunities to perform log analytics through Kibana.

#7. Static IPs – In the cloud environment, there is limited need or no need for static IPs . However, this idea requires a little bit of getting used to, especially, when we are used to fixed IPs throughout our software life. In our case, only NAT Gateways have Elastic IPs. Pretty much every thing else in our environment have virtual IPs and almost all of them are private too. The SSH Bastions have public IP but are not static. So if the cloud formation that was used to build up the bastion were to be deleted and redone, the bastions will get new IPs. We felt that is OK given the fact that only few had access.

#8. Private IPs – Almost all of our IPs are private and none of them is visible to the outside world. The public IPs are for NAT Gateways, external facing ELBs and Bastions. One can access the private IPs only from the bastion. Initially, this process caused bit of pain because every time we needed to SSH to our EC2 resource we had to figure out what the IP was. This meant logging into the console to see what the private IP. This process required few more clicks than earlier. However, with automation leveraging AWS cli this problem is being aggressively tackled by our capable DevOps team.

#9. ASG – To scale we had set up CPU High and Low Alarms. Here too, it is a good idea to put some thought into what the high threshold and low threshold should look like. This one we learnt by experience. At one point our application servers were trashing pretty bad. In the middle of debugging, the server will just power off. The shutdown felt arbitrary with no apparent reasons. We went chasing our tail, thinking that the environment was “unstable”, suspecting something was wrong with the UserData part of the EC2. In the end, it turned out that the High CPU Alarm threshold was not right. The bar was too low, and when the application hit the low bar for the high threshold, ASG terminated the instance and replaced with new instance which then terminated promptly. Resetting the High CPU Alarms for Auto Scaling brought stability and relief.

#10. Tags – Putting thoughts into tags is extremely important. Tags are free form text and hence it is important to establish a solid naming convention for all the resources and diligently stick to it. This has potential to become run away and chaotic if not controlled from the get go.

#11. SSL Termination – Terminating SSL in ELB offloads the SSL overhead away from the webservers. In addition, AWS provides nicely packaged predefined policies for security which makes security a breeze (example turning off TLS V1.0 is a walk in the park)

#12. RDS – Going down this route takes away lot of freedom that comes with, say, setting up Postgres on EC2 (or MySQL on EC2). AWS retains the rights of true “superuser” and the admin user is limited to restricted set of privileges. For legacy application this is another area where people may have to spend time cleaning up. Another neat thing about RDS is that encrypting data at rest is a breeze. However, it might be a good idea to generate key from KMS and use it rather than use the default one.

#13. IAM Groups and Users – Time need to be put in to design and build out of IAM groups with appropriate set of permissions. The users can be assigned to the groups which gives better control over limiting permissions as well as achieving well thought out separation of responsibilities.

#14. Getting Help – The free support through AWS Forums is totally useless. Questions goes unanswered. Ponying up $ for support is well worth it (because of reasons mentioned in #15)

#15. Still Not Perfect – AWS is not yet perfect. For instance, during our production DB build out, Read Only Replica failed for unknown reason. It took multiple attempts with some help to AWS support to get rid of the zombie read only replica that sat in a limbo state for 12+ hours. During another time, we encountered an issue with the Cloudformation script. Specifically, we ran into situation where we were unable to delete a script because it relied on another script that was deleted successfully during an earlier time. The error message indicated that the script couldn’t be deleted because it used an export from the other script that was long gone (but managed to stick around behind the curtains in a phantom state).

#16. /var/log/cloud-init-output – During the build out phase, reviewing the output log in this location makes debugging UserData a breeze. The output clearly tells what went wrong.

#17. CodeDeploy woes. We used the “AutoScalingGroups” bit in the “AWS::CodeDeploy::DeploymentGroup”. However, every now and then, the ASG went into a weird state. To fix this state, meant we had to clean things up manually, which involved getting a list of ASG life cycle hooks and then identifying the one for CodeDeploy and then manually delete it using CLI. When this became a recurring pain, we switched over to Ec2TagFilters which made life a lot easier.

#18. CloudFormation – Keeping the scripts small and building one bit on top of another keeps the scripts organized, manageable and error free. We started with monolithic scripts with thousands and thousands of lines of code. We rapidly realized this was going to be problematic, and pivoted over to breaking it apart. So we built the core infrastructure (VPC,Internet Gateway, Nat Gateway, Route Tables, Routes etc), followed by web infrastructure (ELB, SG etc), webserver (ASG, Alarms, etc), appserver etc. We build up one after another using exports from the previous script.

#19. Lambda – We used Lambdas to execute custom logic in CodePipeline. The custom logic involved executing shell scripts in EC2 instances and moving files from one S3 bucket to another. The shell script were executed from CodePipeline through Lambda and SSM (it is bit complex that we like it). In addition, we utilized Lambda to send EC2, ASG and RDS Alarms and CodePipeline Approvals to HipChat Room. We think Lambda’s provides solid potential in AWS environment to automate many manual tasks.

#20. AWS Lock In – AWS provides amazing set of tools (CLI) and SDK (Java, Python etc) that makes automation a breeze. In addition, AWS is also starting to offer neat solutions for Code Build, Deploy etc that seamlessly inter operates with other AWS services and technology stacks. Leveraging more and more of these, means we are tightly coupling the applications and processes to the “virtual” environment. Such coupling means, moving to another cloud provider like Azure or GCP in the future will be lot harder to execute. So before digging deeper, it is important to evaluate the long term cloud strategy and have a crisp view on the path being taken. (same logic holds true for reserved instances)

Note: There were areas that we just couldn’t get to prior to production push but plan to tackle soon (1) Evaluate the ELB health checks instead of EC2 to make auto scaling determination (2) Evaluate federation option in lieu of of clustering to avoid network partition issue which seem to happen every now and then (3) Evaluate custom metrics instead of the free ones (4) Use Stacked Set for Cloudformation (5) CloudTrail for Audit (6) Granular Billing Alerts (7) Evaluate the use of reserved instances to save some more money (8) Explore Cloudian to reduce the cost even further

Microservices and organizational barriers

Off late more and more I hear about traditional IT organizations wanting to implement microservices architectural pattern. The underlying reasoning is that some of the tech leaders in these organization view “microservices” as a technical solution that will enable them to achieve velocity in the “market” (i.e deliver faster, better, cheaper). However I see that some of these folk fail to see that microservices is an architectural style like Service Oriented Architecture rather than an tool or a product or a solution that could be purchased, installed and configured. Martin fowler in this excellent article talked about technical & non technical aspects related to micro service architecture. In this post I would like to discuss some of the underlying organizational structure and implications.

There are some key considerations in any traditionally structured organization wanting to go a microservices centric model. Yes it true that microservices centric organizations are able to roll out products at amazing velocity (example. Amazon). But then there are two key considerations to building an effective microservices centric architecture and they are (1) organizational culture and (2) organizational architecture. In this post I will tackle the org architecture and not culture. A _typical_ IT organization is structured like this
OrgStructure
Some organizations are structured along technology lines, others are structured along business process line or some times combinations of both. For instance, an organization structured around technology lines will have one leader incharge of ERP systems (like SAP) and other leader incharge of .NET based web applications and other leader incharge of all Java based applications. In a business process centric organization, for instance,as in a Mortgage Banking technology group, one leader will be heading up all mortgage originations related application, another leader would be incharge of all servicing applications etc. In such a set up, implementing a microservices centric application architecture will be extremely complicated or even impossible. In this set up, monolightic application is a natural way of getting things done. The monolithic application has targetted purpose and the applications are structured to along the lines of hiearcical organization.

One of the reasons the IT organizations are organized the way they are… is to achieve cost efficiencies! period. A typical IT organization is a supporting group (excludes software product firms) and delivers capabilities to _support_ the business. IT is a cost center and hence there is ALWAYs the constant pressure to reduce the OpEx. One of the ways to reduce expense is by establishing hiearchical centralization to help achieve scale Human Resource efficiencies (spread a developer across multiple projects). In the last couple of decades offshoring and outsourcing has been another tool to reduce expenses. While some organization claim they are in India or elsewhere for talent arbitrage, most of the traditional technology shop leverage offshore as cost arbitrage opportunity with imposed structure supporting monolithic legacy applications.

In constrast a service oriented organization is more decentralized. In that model, what we will have is a set of services that provides certain functionality. To make it concrete, lets take mortgage as an example. Some of the possible services are… Modification Service, Underwriting Service, Risk Service (default risk, liquidity risk etc), Customer Service etc. From organization perspective, the service is an atomic unit supported by a team that builds, maintains and runs the service. The team is accountable for the service and is on the hook to keep up with changes. In the case of Modification Service, if there is a HAMP program, the team enhance the service to incorporate HAMP related logic in the service offering as manadated by regulatory bodies. The app developers in the service oriented world will expose smart and rich end points (and not dumb ones) that could be consumed by any application (web, mobile, voice, gesture). In addition, with this set up, the service now can be scaled up and down independently. A real life use case – during the mortage crisis, with the number of defaults increasing rapidly many many mortgage companies struggled to keep up with the volumes both on the technology side as well as on the organizational side (ex. call center staffing). Even though firms were rapidly able to hire people to service the defaulting customers, technology pretty much struggled for a long time because of inability to scale during times of crisis. The companies had to scale multiple monolightic applications (there by increasing cost) just to keep the lights on. With service oriented approach, it would have been possible to scale up the critical default related services rather than a shot gun approach of scaling up everything.

In addition, with service oriented set up, companies need to fundamentally shift thinking on how projects are conceputalized and executed. For instance, today, many companies still think along the lines of creating a business case for a capitalizable capital projects with distinct start and end dates. One the business case gets approved, the capex projects builds upon existing application(s) by layering in more features and functions. While model works, it doesnt align with service oriented thinking. Companies needs to fundamentally shift to into a product centric thinking instead of project centric thinking. For instance, a mortage company could think of Modification Service, Under Writing service as a product. Features need to be built into the product so that it can support the business needs. It will require a forward thinking product strategist and a product owner who can define the lifecycle & roadmap for the product. Now if a company figures out a killer Under Writing service it could now think of monitizing by exposing the service to other morgage companies (possibly after fully vetting out any legal implications)

Once such services are in place, then creating composite application that leverage the services becomes _relatively_ simple. An user interface centric application (read mobile or web) or Non UI application (voice, gesture) can access the services and provide the desired outcomes. This model is extremely desirable especially now when the desire for achieving rapid velocity to rollout differentiated products and services is more than any other time. Traditional companies steeped in motholithic application are unable to respond fast enough. The business leaders perinneal gripe is that IT is not fast enough. However with focus on cost supported by rigid org structure & traditional mindset, CIO have miles and miles of ground to cover before they can be nimble like the way their business leaders wants them to be. The business leaders on the other hand, need to stop looking at technology as a cost center, and need to make strategic investments. However in the interim the chasm created by what is possible versus what is current is giving opportunities for the new, nimble service oriented upstart to come in and disrupt the established players (example Honest Dollar versus rest of the 401k industry).

Frobenius norm

A quck intro on how to calculate Frobenius norm or 2 norms

1
2
3
4
X = np.array([[3.,5.,8.],[4.,12.,15.]])
norms = np.linalg.norm(X, axis=0)
This results in
[  5.  13.  17.]

This the equation
Frobenius_norm

To calculate this
SQRT ( 3**2 + 5**2 ), SQRT ( 5**2 + 13**2 ), SQRT ( 8**2 + 15**2 )
The output is 5, 13, 17

Edu Data Analysis –

This analysis is based on the edu data made available from data.gov and downloaded from Kaggle. I did this basic analysis using Apache Spark and iPython. While the whole thing could be done using iPython, I included Spark to further my own learning of the technology.

Falling Admission Rate
While the admission rate has been falling in general, 5 universities have been consistently reducing the rate EVERY single year for the last decade. This analysis is from 2003 through 2013. I have not checked how the admission rate held up in 2014 and 2015.

The five are
(1) ‘Cornell University’,
(2) ‘Vanderbilt University’,
(3) ‘LeTourneau University’,
(4) ‘University of Louisiana at Lafayette’,
(5) ‘Massachusetts Institute of Technology’

adm_rates

3 out of the 5 is actually not a big surprize but I was surprized by these two schools — University of Louisiana at Lafayette and LeTourneau University where admission rates consistently fell year after year. Clearly they are doing something that is making it popular among students!

Largest Percent Change in Admission Rate
From 2003 through 2013, the admission rate dropped the most for these universities

Univ Percent Change Adm Rate in 2003 Adm Rate in 2013
University of Chicago 0.777 0.3962 0.0881
Mississippi Valley State University 0.770 0.9888 0.2272
Adventist University of Health Sciences 0.743 0.5213 0.1336
Vanderbilt University 0.725 0.4626 0.1274
Robert Morris University Illinois 0.723 0.7591 0.21
Pitzer College 0.710 0.501 0.1451
Missouri Valley College 0.668 0.6677 0.2218
Claremont McKenna College 0.622 0.3102 0.1173
Colorado College 0.618 0.5833 0.2228
Corban University 0.614 0.8364 0.3225

There are some universities that I have not heard about.., so it was interesting to see that these schools have reduced admission rate by a large percent points.

Mean ACT Change Year over Year
This shows how the mean ACT changes from 2003 through 2013. Interesting to see rate increased rapid from 2006 onwards and seems to be stablizing around 2011,

act_mid_mean_yoy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
    SparkConf conf = new SparkConf().setAppName("test").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df2013 = sqlContext.read()
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("/home/.../MERGED2013_PP.csv");


  // Load all the individual files from 2013 through 2003
  // Next select the interesting columns and rename columns
    DataFrame output2013 = df2013.filter("ADM_RATE <> 'NULL'")
        .sort("ADM_RATE")
        .select("OPEID", "HIGHDEG", "ADM_RATE", "SAT_AVG", "ACTCMMID", "TUITIONFEE_IN", "INEXPFTE")
        .withColumnRenamed("ADM_RATE", "ADM_RATE_2012")
        .withColumnRenamed("HIGHDEG", "HIGHDEG_2012")
        .withColumnRenamed("SAT_AVG", "SAT_AVG_2012")
        .withColumnRenamed("ACTCMMID", "ACTCMMID_2012")
        .withColumnRenamed("TUITIONFEE_IN", "TUITIONFEE_IN_2012")
        .withColumnRenamed("INEXPFTE", "INEXPFTE_2012");
 
  // Join all the individual data frames with join key of OEID
    DataFrame joinedDf = output2013.join(output2012, "OPEID")
        .join(output2011, "OPEID")
        .join(output2010, "OPEID")
        .join(output2009, "OPEID")
        .join(output2008, "OPEID")
        .join(output2007, "OPEID")
        .join(output2006, "OPEID")
        .join(output2005, "OPEID")
        .join(output2004, "OPEID")
        .join(output2003, "OPEID");


  // Filter. YOY reduce
    DataFrame dfOutput = joinedDf.filter("ADM_RATE <= ADM_RATE_2012 and "
        + "ADM_RATE_2012 <= ADM_RATE_2011 and "
        + "ADM_RATE_2011 <= ADM_RATE_2010 and "
        + "ADM_RATE_2010 <= ADM_RATE_2009 and "
        + "ADM_RATE_2009 <= ADM_RATE_2008 and "
        + "ADM_RATE_2008 <= ADM_RATE_2007 and "
        + "ADM_RATE_2007 <= ADM_RATE_2006 and "
        + "ADM_RATE_2006 <= ADM_RATE_2005 and "
        + "ADM_RATE_2005 <= ADM_RATE_2004 and "
        + "ADM_RATE_2004 <= ADM_RATE_2003 and "
        + "ADM_RATE < 1 and "
        + "ADM_RATE_2012 < 1 and "
        + "ADM_RATE_2011 < 1 and "
        + "ADM_RATE_2010 < 1 and "
        + "ADM_RATE_2009 < 1 and "
        + "ADM_RATE_2008 < 1 and "
        + "ADM_RATE_2007 < 1 and "
        + "ADM_RATE_2006 < 1 and "
        + "ADM_RATE_2005 < 1 and "
        + "ADM_RATE_2004 < 1 and "
        + "ADM_RATE_2003 < 1 ");

    // Write out to file
    dfOutput.limit(1000).coalesce(1).write()
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save("/home/.../adm_rate_yoy.csv");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import graphlab
adm_rate_yoy = graphlab.SFrame('adm_rate_yoy.csv/part-00000.csv')
import matplotlib.pyplot as plt
year = [2013, 2012, 2011, 2010, 2009, 2008, 2007, 2006, 2005, 2004, 2003]
adm_rate_v = [0.1274, 0.1423, 0.1642, 0.1795, 0.2015, 0.2533, 0.3282, 0.3528, 0.3528, 0.3831, 0.4626]
adm_rate_c = [0.1556, 0.1655, 0.1797, 0.1836, 0.191, 0.2066, 0.214, 0.2468, 0.2708, 0.2944, 0.3099,]
adm_rate_l = [0.4066, 0.4335, 0.5594, 0.5874, 0.5904, 0.6619, 0.7015, 0.7345, 0.7641, 0.7937, 0.8012,]
adm_rate_ul = [0.593, 0.6036, 0.6593, 0.6688, 0.6718,  0.6784, 0.7023, 0.731, 0.758, 0.8496, 0.8703,]
adm_rate_m = [0.0815, 0.0895, 0.0973, 0.1008, 0.107, 0.1186, 0.1248, 0.1331,  0.1431, 0.1591, 0.1645,]

plt.xlabel('Year')
plt.ylabel('Admission Rate')
plt.plot(year, adm_rate_v, label='Vanderbilt University')
plt.plot(year, adm_rate_c, label='Cornell University')
plt.plot(year, adm_rate_l, label='LeTourneau University')
plt.plot(year, adm_rate_ul, label='University of Louisiana at Lafayette')
plt.plot(year, adm_rate_m, label='Massachusetts Institute of Technology')
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.show()

CSV and Parquet

While CSV is great for readability, for working within Spark, Parquet is choice to speed things up. More details on Apache Parquet could be found here. Essentially the solution provides provides columnar storage that enables complex data to be encoded efficiently in bulk. The difference between columnar structure relative to a traditional DB structure is that how data is fundamentally organized. This enables searches across large data sets and reads of large sets of data can be optimized. Parquet provides better performance advantage over CSV, which is true especially dealing with large data sets. Here is an excellent article that elegently articulates the benefits

To convert CSV data to Parquet. First get Apache Drill which is pretty straight forward

1
2
3
4
wget http://mirrors.sonic.net/apache/drill/drill-1.5.0/apache-drill-1.5.0.tar.gz
tar -xvf apache-drill-1.5.0.tar.gz
cd apache-drill-1.5.0/bin
./drill-embedded

which starts up Apache Drill.

1
2
3
4
5
6
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0
Mar 09, 2016 9:46:33 PM org.glassfish.jersey.server.ApplicationHandler initialize
INFO: Initiating Jersey application, version Jersey: 2.8 2014-04-29 01:25:26...
apache drill 1.5.0
"start your sql engine"
0: jdbc:drill:zk=local>

Next get the CSV data that needs to be converted. But first lets check out the CSV. Set the format as Parquet. Note that ` (below ~) is not ‘ (below “)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
0: jdbc:drill:zk=local> select columns[3] from dfs.`/home/xxx/Development/Data/Education/output/MERGED2013_PP.csv`;
0: jdbc:drill:zk=local> alter session set `store.format`='parquet';
+-------+------------------------+
|  ok   |        summary         |
+-------+------------------------+
| true  | store.format updated.  |
+-------+------------------------+
1 row selected (0.103 seconds)

0: jdbc:drill:zk=local> create table dfs.tmp.`/output/pData/` as select * from dfs.`/home/xxx/Development/Data/Education/output/MERGED2013_PP.csv`;
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+-----------+----------------------------+
| Fragment  | Number of records written  |
+-----------+----------------------------+
| 0_0       | 7805                       |
+-----------+----------------------------+
1 row selected (4.445 seconds)

0: jdbc:drill:zk=local> select columns[3] from dfs.tmp.`/output/pData/*`;


The output goes to /tmp/output/pData

If the above output needs to be changed, the navigate to http://localhost:8047/storage/dfs and update the /tmp location

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
{
  "type": "file",
  "enabled": true,
  "connection": "file:///",
  "workspaces": {
    "root": {
      "location": "/",
      "writable": false,
      "defaultInputFormat": null
    },
    "tmp": {
      "location": "/tmp",
      "writable": true,
      "defaultInputFormat": null
    }
  },
  "formats": {
    "psv": {
      "type": "text",
      "extensions": [
        "tbl"
      ],
      "delimiter": "|"
    },
    "csv": {
      "type": "text",
      "extensions": [
        "csv"
      ],
      "delimiter": ","
    },
    "tsv": {
      "type": "text",
      "extensions": [
        "tsv"
      ],
      "delimiter": "\t"
    },
    "parquet": {
      "type": "parquet"
    },
    "json": {
      "type": "json"
    },
    "avro": {
      "type": "avro"
    },
    "sequencefile": {
      "type": "sequencefile",
      "extensions": [
        "seq"
      ]
    },
    "csvh": {
      "type": "text",
      "extensions": [
        "csvh"
      ],
      "extractHeader": true,
      "delimiter": ","
    }
  }
}

Proverbial Count

Proverbial count example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class App
{
    public static void main( String[] args ) {
    // Local mode
    SparkConf sparkConf = new SparkConf().setAppName("HelloWorld").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
   
    JavaRDD<String> rdd = ctx.textFile("/home/Development/MarsWorkspace/third/resources/Forgiveness1.rtf");
    JavaRDD<String> words = rdd.flatMap(line -> Arrays.asList(line.split(" ")));
   
    JavaPairRDD<String, Integer> counts = words.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
               .reduceByKey((x, y) -> x + y);
    counts.saveAsTextFile("/home/Development/MarsWorkspace/third/resources/out.txt");
    }
}

hadoop setup

Step 1: Install Java or validate Java install

Step 2: Create user and group in the system

1
2
3
4
5
6
7
8
9
10
sudo addgroup hadoop
[sudo] password for xxx:
Adding group `hadoop' (GID 1001) ...
Done.

sudo adduser --ingroup hadoop hduser
Adding user `hduser'
...
Adding new user `hduser' (1001) with group `hadoop' ...
Creating home directory `/home/hduser' ...
Copying files from `/etc/skel'
...

Step 3. Validate if SSH installed (which ssh, which sshd), if not install it (sudo apt-get install ssh)

Step 4: Create SSH Certificates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
su - hduser
Password:
hduser@@@@-Lenovo-Z50-70:~$ ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa):
Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub.
The key fingerprint is:
8c:0d:ca:68:6e:f0:91:f5:ba:f0:a3:d9:a5:b0:7f:71 hduser@@@@-Lenovo-Z50-70
The key's randomart image is:
+---[RSA 2048]----+
|                 |
|                 |
|    . .          |
|   = o =         |
|. = o o S        |
| = . .. E        |
|  * . .o         |
| . B.+.          |
|  +o*o           |
+-----------------+

$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

Step 5: Check if the ssh works.

1
2
3
4
5
6
$ ssh localhost
The authenticity of host 'localhost (127.0.0.1)' can't be established.
ECDSA key fingerprint is 91:51:2b:33:cd:bc:65:45:ca:4c:e2:51:9d:1e:93:f2.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '
localhost' (ECDSA) to the list of known hosts.
Welcome to Ubuntu 15.04 (GNU/Linux 3.16.0-55-generic x86_64)

Step 6: Get binaries from Hadoop site (as of 02/19, current version is 2.7, but I got 2.6 for other reasons) and go through the process of setting up

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ cd /usr/local
$ sudo tar xzf hadoop-2.6.4.tar.gz
$ sudo mv hadoop-2.6.4 hadoop
$ sudo chown -R hduser:hadoop hadoop

vi .bashrc
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HADOOP_INSTALL=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"


$ which javac
/usr/bin/javac

 readlink -f /usr/bin/javac
/usr/lib/jvm/java-8-oracle/bin/javac

$ vi /usr/local/hadoop/etc/hadoop/hadoop-env.sh
Update JAVA_HOME
# The java implementation to use.
export JAVA_HOME=/usr/lib/jvm/java-8-oracle

$ sudo mkdir -p /app/hadoop/tmp
$ sudo chown hduser:hadoop /app/hadoop/tmp

Next Update core-site.xml
The core-site.xml file informs Hadoop daemon where NameNode runs in the cluster. It contains the configuration settings for Hadoop Core such as I/O settings that are common to HDFS and MapReduce.

There’re three HDFS properties which contain hadoop.tmp.dir in their values

1. dfs.name.dir: directory where namenode stores its metadata, with default value ${hadoop.tmp.dir}/dfs/name.
2. dfs.data.dir: directory where HDFS data blocks are stored, with default value ${hadoop.tmp.dir}/dfs/data.
3. fs.checkpoint.dir: directory where secondary namenode store its checkpoints, default value is ${hadoop.tmp.dir}/dfs/namesecondary.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<configuration>
   <property>
       <name>hadoop.tmp.dir</name>
       <value>/app/hadoop/tmp</value>
       <description>A base for other temporary directories.</description>
    </property>

    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:54310</value>
        <description>The name of the default file system.  A URI whose
        scheme and authority determine the FileSystem implementation.  The
        uri's scheme determines the config property (fs.SCHEME.impl) naming
        the FileSystem implementation class.  The uri's authority is used to
        determine the host, port, etc. for a filesystem.</description>
    </property>
</configuration>

Next need to edit the mapred-site.xml, which is used to specify which framework is being used for MapReduce. Edit this XML file to add the following

1
2
3
4
5
6
7
8
9
10
<configuration>
 <property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
 </property>
</configuration>

Next to edit the hdfs-site.xml to specify the namenode and the datanode. Create two directories which will contain the namenode and the datanode for this Hadoop installation.

1
2
3
4
$ sudo mkdir -p /usr/local/hadoop_store/hdfs/namenode
@@@-Lenovo-Z50-70:~$ sudo mkdir -p /usr/local/hadoop_store/hdfs/datanode
@@@-Lenovo-Z50-70:~$ sudo chown -R hduser:hadoop /usr/local/hadoop_store
$ vi /usr/local/hadoop/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<configuration>
 <property>
  <name>dfs.replication</name>
  <value>1</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
 </property>
 <property>
   <name>dfs.namenode.name.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/namenode</value>
 </property>
 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:/usr/local/hadoop_store/hdfs/datanode</value>
 </property>
</configuration>

Next step, format filesystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
hduser@@@@-Lenovo-Z50-70:~$ hadoop namenode -format
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

16/02/21 09:47:52 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = @@@-Lenovo-Z50-70/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 2.6.4
STARTUP_MSG:   classpath = ...
STARTUP_MSG:   build = https://git-wip-us.apache.org/repos/asf/hadoop.git -r 5082c73637530b0b7e115f9625ed7fac69f937e6; compiled by 'jenkins' on 2016-02-12T09:45Z
STARTUP_MSG:   java = 1.8.0_66
************************************************************/
16/02/21 09:47:52 INFO namenode.NameNode: registered UNIX signal handlers for [TERM, HUP, INT]
16/02/21 09:47:52 INFO namenode.NameNode: createNameNode [-format]
16/02/21 09:47:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Formatting using clusterid: CID-4ad4548d-6cb6-492c-9d38-ee5f665143c2
16/02/21 09:47:53 INFO namenode.FSNamesystem: No KeyProvider found.
16/02/21 09:47:53 INFO namenode.FSNamesystem: fsLock is fair:true
16/02/21 09:47:53 INFO blockmanagement.DatanodeManager: dfs.block.invalidate.limit=1000
16/02/21 09:47:53 INFO blockmanagement.DatanodeManager: dfs.namenode.datanode.registration.ip-hostname-check=true
16/02/21 09:47:53 INFO blockmanagement.BlockManager: dfs.namenode.startup.delay.block.deletion.sec is set to 000:00:00:00.000
16/02/21 09:47:53 INFO blockmanagement.BlockManager: The block deletion will start around 2016 Feb 21 09:47:53
16/02/21 09:47:53 INFO util.GSet: Computing capacity for map BlocksMap
16/02/21 09:47:53 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:53 INFO util.GSet: 2.0% max memory 889 MB = 17.8 MB
16/02/21 09:47:53 INFO util.GSet: capacity      = 2^21 = 2097152 entries
16/02/21 09:47:53 INFO blockmanagement.BlockManager: dfs.block.access.token.enable=false
16/02/21 09:47:53 INFO blockmanagement.BlockManager: defaultReplication         = 1
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxReplication             = 512
16/02/21 09:47:53 INFO blockmanagement.BlockManager: minReplication             = 1
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxReplicationStreams      = 2
16/02/21 09:47:53 INFO blockmanagement.BlockManager: replicationRecheckInterval = 3000
16/02/21 09:47:53 INFO blockmanagement.BlockManager: encryptDataTransfer        = false
16/02/21 09:47:53 INFO blockmanagement.BlockManager: maxNumBlocksToLog          = 1000
16/02/21 09:47:54 INFO namenode.FSNamesystem: fsOwner             = hduser (auth:SIMPLE)
16/02/21 09:47:54 INFO namenode.FSNamesystem: supergroup          = supergroup
16/02/21 09:47:54 INFO namenode.FSNamesystem: isPermissionEnabled = true
16/02/21 09:47:54 INFO namenode.FSNamesystem: HA Enabled: false
16/02/21 09:47:54 INFO namenode.FSNamesystem: Append Enabled: true
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map INodeMap
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 1.0% max memory 889 MB = 8.9 MB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^20 = 1048576 entries
16/02/21 09:47:54 INFO namenode.NameNode: Caching file names occuring more than 10 times
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map cachedBlocks
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 0.25% max memory 889 MB = 2.2 MB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^18 = 262144 entries
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.threshold-pct = 0.9990000128746033
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.min.datanodes = 0
16/02/21 09:47:54 INFO namenode.FSNamesystem: dfs.namenode.safemode.extension     = 30000
16/02/21 09:47:54 INFO namenode.FSNamesystem: Retry cache on namenode is enabled
16/02/21 09:47:54 INFO namenode.FSNamesystem: Retry cache will use 0.03 of total heap and retry cache entry expiry time is 600000 millis
16/02/21 09:47:54 INFO util.GSet: Computing capacity for map NameNodeRetryCache
16/02/21 09:47:54 INFO util.GSet: VM type       = 64-bit
16/02/21 09:47:54 INFO util.GSet: 0.029999999329447746% max memory 889 MB = 273.1 KB
16/02/21 09:47:54 INFO util.GSet: capacity      = 2^15 = 32768 entries
16/02/21 09:47:54 INFO namenode.NNConf: ACLs enabled? false
16/02/21 09:47:54 INFO namenode.NNConf: XAttrs enabled? true
16/02/21 09:47:54 INFO namenode.NNConf: Maximum size of an xattr: 16384
16/02/21 09:47:54 INFO namenode.FSImage: Allocated new BlockPoolId: BP-1807438273-127.0.1.1-1456069674855
16/02/21 09:47:55 INFO common.Storage: Storage directory /usr/local/hadoop_store/hdfs/namenode has been successfully formatted.
16/02/21 09:47:55 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0
16/02/21 09:47:55 INFO util.ExitUtil: Exiting with status 0
16/02/21 09:47:55 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at @@@-Lenovo-Z50-70/127.0.1.1
************************************************************/
hduser@@@@-Lenovo-Z50-70:~$

Next starting hadoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hduser@@@@-Lenovo-Z50-70:/usr/local/hadoop/sbin$ ./start-all.sh
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
16/02/21 10:23:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hduser-namenode-@@@-Lenovo-Z50-70.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-@@@-Lenovo-Z50-70.out
Starting secondary namenodes [0.0.0.0]
The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established.
ECDSA key fingerprint is 91:51:2b:33:cd:bc:65:45:ca:4c:e2:51:9d:1e:93:f2.
Are you sure you want to continue connecting (yes/no)? yes
0.0.0.0: Warning: Permanently added '
0.0.0.0' (ECDSA) to the list of known hosts.
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-hduser-secondarynamenode-@@@-Lenovo-Z50-70.out
16/02/21 10:23:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hduser-resourcemanager-@@@-Lenovo-Z50-70.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-@@@-Lenovo-Z50-70.out

Check for running status

1
2
3
4
5
6
7
hduser@@@@-Lenovo-Z50-70:/usr/local/hadoop/sbin$ jps
6624 NodeManager
6016 NameNode
6162 DataNode
6503 ResourceManager
6345 SecondaryNameNode
6941 Jps

Access the web interface through — http://localhost:50070

Scala Hello World in Scala IDE

After downloading ScalaIDE and after installing https://github.com/sonatype/m2eclipse-scala, still ran into the issue of not finding the right arch-type. Solved by adding a remote catalog: http://repo1.maven.org/maven2/archetype-catalog.xml

Here is the POM XML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.vitarkah.scala</groupId>
  <artifactId>first</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2015</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.10.6</scala.version>
    <scala.compat.version>2.10</scala.compat.version>
  </properties>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.10</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalactic</groupId>
      <artifactId>scalactic_2.10</artifactId>
      <version>2.2.6</version>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_2.10</artifactId>
      <version>2.2.6</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <!-- mixed scala/java compile -->
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <id>compile</id>
            <goals>
              <goal>compile</goal>
            </goals>
            <phase>compile</phase>
          </execution>
          <execution>
            <id>test-compile</id>
            <goals>
              <goal>testCompile</goal>
            </goals>
            <phase>test-compile</phase>
          </execution>
          <execution>
            <phase>process-resources</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
      <!-- for fatjar -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>assemble-all</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <mainClass>fully.qualified.MainClass</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
    <pluginManagement>
      <plugins>
        <!--This plugin's configuration is used to store Eclipse m2e settings
          only. It has no influence on the Maven build itself. -->
        <plugin>
          <groupId>org.eclipse.m2e</groupId>
          <artifactId>lifecycle-mapping</artifactId>
          <version>1.0.0</version>
          <configuration>
            <lifecycleMappingMetadata>
              <pluginExecutions>
                <pluginExecution>
                  <pluginExecutionFilter>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>
                      maven-scala-plugin
                    </artifactId>
                    <versionRange>
                      [2.15.2,)
                    </versionRange>
                    <goals>
                      <goal>compile</goal>
                      <goal>testCompile</goal>
                    </goals>
                  </pluginExecutionFilter>
                  <action>
                    <execute></execute>
                  </action>
                </pluginExecution>
              </pluginExecutions>
            </lifecycleMappingMetadata>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

</project>

Here is the “hello world” Scala app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.vitarkah.scala.first
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/**
 * @author ${user.name}
 */

object App {
 
def main(args: Array[String]) {

    // initialise spark context
    val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
    val sc = new SparkContext(conf)
   
    // do stuff
    println("Hello, world!")
   
    // terminate spark context
    sc.stop()
   
  }

}

Here is the output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/14 21:04:59 INFO SparkContext: Running Spark version 1.6.0
16/02/14 21:04:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/14 21:05:00 WARN Utils: Your hostname, vichu-Lenovo-Z50-70 resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/14 21:05:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/14 21:05:00 INFO SecurityManager: Changing view acls to: vichu
16/02/14 21:05:00 INFO SecurityManager: Changing modify acls to: vichu
16/02/14 21:05:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vichu); users with modify permissions: Set(vichu)
16/02/14 21:05:00 INFO Utils: Successfully started service '
sparkDriver' on port 51488.
16/02/14 21:05:00 INFO Slf4jLogger: Slf4jLogger started
16/02/14 21:05:01 INFO Remoting: Starting remoting
16/02/14 21:05:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.140:38851]
16/02/14 21:05:01 INFO Utils: Successfully started service '
sparkDriverActorSystem' on port 38851.
16/02/14 21:05:01 INFO SparkEnv: Registering MapOutputTracker
16/02/14 21:05:01 INFO SparkEnv: Registering BlockManagerMaster
16/02/14 21:05:01 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-37186a0b-a60a-41d7-8561-fd4054ef67f7
16/02/14 21:05:01 INFO MemoryStore: MemoryStore started with capacity 1088.6 MB
16/02/14 21:05:01 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/14 21:05:01 INFO Utils: Successfully started service '
SparkUI' on port 4040.
16/02/14 21:05:01 INFO SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/14 21:05:01 INFO Executor: Starting executor ID driver on host localhost
16/02/14 21:05:01 INFO Utils: Successfully started service '
org.apache.spark.network.netty.NettyBlockTransferService' on port 50884.
16/02/14 21:05:01 INFO NettyBlockTransferService: Server created on 50884
16/02/14 21:05:01 INFO BlockManagerMaster: Trying to register BlockManager
16/02/14 21:05:01 INFO BlockManagerMasterEndpoint: Registering block manager localhost:50884 with 1088.6 MB RAM, BlockManagerId(driver, localhost, 50884)
16/02/14 21:05:01 INFO BlockManagerMaster: Registered BlockManager

Hello, world! <<--- There it is!


16/02/14 21:05:01 INFO SparkUI: Stopped Spark web UI at http://192.168.1.140:4040
16/02/14 21:05:01 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/02/14 21:05:01 INFO MemoryStore: MemoryStore cleared
16/02/14 21:05:01 INFO BlockManager: BlockManager stopped
16/02/14 21:05:01 INFO BlockManagerMaster: BlockManagerMaster stopped
16/02/14 21:05:01 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/02/14 21:05:01 INFO SparkContext: Successfully stopped SparkContext
16/02/14 21:05:01 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/02/14 21:05:01 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/02/14 21:05:01 INFO ShutdownHookManager: Shutdown hook called
16/02/14 21:05:01 INFO ShutdownHookManager: Deleting directory /tmp/spark-44aa9e76-6aec-4162-8856-c1605c8f060c

Spark Java HelloWorld

1. First, download & install eclipse Mars for Ubuntu 15 (pretty staight forward from here)

2. Create an Maven Project in Eclipse. Straight forward

3. Adding Spark Depedency

1
2
3
4
5
6
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.1</version>
      <scope>provided</scope>
    </dependency>

4. Hello World Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

/**
 * Hello world!
 *
 */

public class App {
  public static void main(String[] args) {

    // Local mode
    SparkConf sparkConf = new SparkConf().setAppName("HelloWorld").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    String[] arr = new String[] { "A1", "B2", "C3", "D4", "F5" };
    List<String> inputList = Arrays.asList(arr);
    JavaRDD<String> inputRDD = ctx.parallelize(inputList);
    inputRDD.foreach(new VoidFunction<String>() {

      public void call(String input) throws Exception {
        System.out.println(input);

      }
    });

  }
}

5. Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
16/02/13 07:27:07 WARN util.Utils: Your hostname, vichu-Lenovo-Z50-70 resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/13 07:27:07 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/13 07:27:07 INFO spark.SecurityManager: Changing view acls to: vichu
16/02/13 07:27:07 INFO spark.SecurityManager: Changing modify acls to: vichu
16/02/13 07:27:07 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vichu); users with modify permissions: Set(vichu)
16/02/13 07:27:07 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/02/13 07:27:07 INFO Remoting: Starting remoting
16/02/13 07:27:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.140:39886]
16/02/13 07:27:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.1.140:39886]
16/02/13 07:27:07 INFO util.Utils: Successfully started service 'sparkDriver' on port 39886.
16/02/13 07:27:07 INFO spark.SparkEnv: Registering MapOutputTracker
16/02/13 07:27:07 INFO spark.SparkEnv: Registering BlockManagerMaster
16/02/13 07:27:07 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20160213072707-5f39
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 56037.
16/02/13 07:27:08 INFO network.ConnectionManager: Bound socket to port 56037 with id = ConnectionManagerId(192.168.1.140,56037)
16/02/13 07:27:08 INFO storage.MemoryStore: MemoryStore started with capacity 945.8 MB
16/02/13 07:27:08 INFO storage.BlockManagerMaster: Trying to register BlockManager
16/02/13 07:27:08 INFO storage.BlockManagerMasterActor: Registering block manager 192.168.1.140:56037 with 945.8 MB RAM, BlockManagerId(<driver>, 192.168.1.140, 56037, 0)
16/02/13 07:27:08 INFO storage.BlockManagerMaster: Registered BlockManager
16/02/13 07:27:08 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e86643a4-7d63-4b1b-9b3c-95178861aa1e
16/02/13 07:27:08 INFO spark.HttpServer: Starting HTTP Server
16/02/13 07:27:08 INFO server.Server: jetty-8.1.14.v20131031
16/02/13 07:27:08 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44346
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'HTTP file server' on port 44346.
16/02/13 07:27:08 INFO server.Server: jetty-8.1.14.v20131031
16/02/13 07:27:08 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
16/02/13 07:27:08 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
16/02/13 07:27:08 INFO ui.SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/13 07:27:08 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.1.140:39886/user/HeartbeatReceiver
16/02/13 07:27:08 INFO spark.SparkContext: Starting job: foreach at App.java:24
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Got job 0 (foreach at App.java:24) with 1 output partitions (allowLocal=false)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Final stage: Stage 0(foreach at App.java:24)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Parents of final stage: List()
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Missing parents: List()
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at App.java:23), which has no missing parents
16/02/13 07:27:08 INFO storage.MemoryStore: ensureFreeSpace(1504) called with curMem=0, maxMem=991753666
16/02/13 07:27:08 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1504.0 B, free 945.8 MB)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at App.java:23)
16/02/13 07:27:08 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/02/13 07:27:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1224 bytes)
16/02/13 07:27:08 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
A1
B2
C3
D4
F5
16/02/13 07:27:08 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 585 bytes result sent to driver
16/02/13 07:27:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 39 ms on localhost (1/1)
16/02/13 07:27:08 INFO scheduler.DAGScheduler: Stage 0 (foreach at App.java:24) finished in 0.054 s
16/02/13 07:27:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/02/13 07:27:08 INFO spark.SparkContext: Job finished: foreach at App.java:24, took 0.240135089 s

Installing Apache Spark

First things first, got to have latest Java

1
2
3
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java8-installer

Next install Scala. The latest version as of today is 2.11.7, but I ran into brick wall with that version. So using 2.10.6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ wget http://www.scala-lang.org/files/archive/scala-2.10.6.deb
$ sudo dpkg -i scala-2.10.6.deb

$ scala
Welcome to Scala version 2.10.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_66).
Type in expressions to have them evaluated.
Type :help for more information.

scala> :q

$ sudo apt-get install git
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0.tgz
$ tar xvf spark-1.6.0.tgz
$ rm spark-1.6.0.tgz
$ cd spark-1.6.0/
$ build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.4 -DskipTests clean package

Notes:
:q is to quit from scala shell
Spark does not yet support its JDBC component for Scala 2.11.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
./run-example SparkPi
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/02/13 06:56:35 INFO SparkContext: Running Spark version 1.6.0
16/02/13 06:56:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/13 06:56:35 WARN Utils: Your hostname, .... resolves to a loopback address: 127.0.1.1; using 192.168.1.140 instead (on interface wlan0)
16/02/13 06:56:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/02/13 06:56:35 INFO SecurityManager: Changing view acls to: ...
16/02/13 06:56:35 INFO SecurityManager: Changing modify acls to: ....
16/02/13 06:56:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(...); users with modify permissions: Set(...)
16/02/13 06:56:36 INFO Utils: Successfully started service '
sparkDriver' on port 34966.
16/02/13 06:56:36 INFO Slf4jLogger: Slf4jLogger started
16/02/13 06:56:36 INFO Remoting: Starting remoting
16/02/13 06:56:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.140:49553]
16/02/13 06:56:36 INFO Utils: Successfully started service '
sparkDriverActorSystem' on port 49553.
16/02/13 06:56:36 INFO SparkEnv: Registering MapOutputTracker
16/02/13 06:56:36 INFO SparkEnv: Registering BlockManagerMaster
16/02/13 06:56:36 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-1e81dc84-91ff-4503-ab3e-7fa8adbac78e
16/02/13 06:56:36 INFO MemoryStore: MemoryStore started with capacity 511.1 MB
16/02/13 06:56:36 INFO SparkEnv: Registering OutputCommitCoordinator
16/02/13 06:56:36 INFO Utils: Successfully started service '
SparkUI' on port 4040.
16/02/13 06:56:36 INFO SparkUI: Started SparkUI at http://192.168.1.140:4040
16/02/13 06:56:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/httpd-6774316e-0973-43c7-b58d-3cc0a1991f95
16/02/13 06:56:36 INFO HttpServer: Starting HTTP Server
16/02/13 06:56:36 INFO Utils: Successfully started service '
HTTP file server' on port 42625.
16/02/13 06:56:37 INFO SparkContext: Added JAR file:/home/.../spark/spark-1.6.0/examples/target/scala-2.10/spark-examples-1.6.0-hadoop2.6.4.jar at http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar with timestamp 1455368197141
16/02/13 06:56:37 INFO Executor: Starting executor ID driver on host localhost
16/02/13 06:56:37 INFO Utils: Successfully started service '
org.apache.spark.network.netty.NettyBlockTransferService' on port 58239.
16/02/13 06:56:37 INFO NettyBlockTransferService: Server created on 58239
16/02/13 06:56:37 INFO BlockManagerMaster: Trying to register BlockManager
16/02/13 06:56:37 INFO BlockManagerMasterEndpoint: Registering block manager localhost:58239 with 511.1 MB RAM, BlockManagerId(driver, localhost, 58239)
16/02/13 06:56:37 INFO BlockManagerMaster: Registered BlockManager
16/02/13 06:56:37 INFO SparkContext: Starting job: reduce at SparkPi.scala:36
16/02/13 06:56:37 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
16/02/13 06:56:37 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
16/02/13 06:56:37 INFO DAGScheduler: Parents of final stage: List()
16/02/13 06:56:37 INFO DAGScheduler: Missing parents: List()
16/02/13 06:56:37 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
16/02/13 06:56:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1888.0 B, free 1888.0 B)
16/02/13 06:56:38 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1202.0 B, free 3.0 KB)
16/02/13 06:56:38 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58239 (size: 1202.0 B, free: 511.1 MB)
16/02/13 06:56:38 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/02/13 06:56:38 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
16/02/13 06:56:38 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/02/13 06:56:38 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2156 bytes)
16/02/13 06:56:38 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2156 bytes)
16/02/13 06:56:38 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/02/13 06:56:38 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/02/13 06:56:38 INFO Executor: Fetching http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar with timestamp 1455368197141
16/02/13 06:56:38 INFO Utils: Fetching http://192.168.1.140:42625/jars/spark-examples-1.6.0-hadoop2.6.4.jar to /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/userFiles-8a089d63-89cb-4bf4-a116-b1dcfb15e6c1/fetchFileTemp3377846363451941090.tmp
16/02/13 06:56:38 INFO Executor: Adding file:/tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/userFiles-8a089d63-89cb-4bf4-a116-b1dcfb15e6c1/spark-examples-1.6.0-hadoop2.6.4.jar to class loader
16/02/13 06:56:38 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1031 bytes result sent to driver
16/02/13 06:56:38 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1031 bytes result sent to driver
16/02/13 06:56:38 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 762 ms on localhost (1/2)
16/02/13 06:56:38 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 736 ms on localhost (2/2)
16/02/13 06:56:38 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/02/13 06:56:38 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) finished in 0.777 s
16/02/13 06:56:38 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 1.003757 s

Pi is roughly 3.13756 <<-- There it is!


16/02/13 06:56:38 INFO SparkUI: Stopped Spark web UI at http://192.168.1.140:4040
16/02/13 06:56:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/02/13 06:56:38 INFO MemoryStore: MemoryStore cleared
16/02/13 06:56:38 INFO BlockManager: BlockManager stopped
16/02/13 06:56:38 INFO BlockManagerMaster: BlockManagerMaster stopped
16/02/13 06:56:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/02/13 06:56:38 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/02/13 06:56:39 INFO SparkContext: Successfully stopped SparkContext
16/02/13 06:56:39 INFO ShutdownHookManager: Shutdown hook called
16/02/13 06:56:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f
16/02/13 06:56:39 INFO ShutdownHookManager: Deleting directory /tmp/spark-29f5d77e-a0ea-4986-95fb-6d3b9104c18f/httpd-6774316e-0973-43c7-b58d-3cc0a1991f95

Install SBT. The most current version is 0.13.9

1
2
3
4
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 642AC823
sudo apt-get update
sudo apt-get install sbt