As part of a recent HumanGeo effort, I was faced with the challenge of detecting patterns and anomalies in large geospatial datasets using various statistics and machine learning methods. Given the size of the datasets, the speed at which they should be processed along with other project constraints, I knew I had to develop a scalable solution that could easily be deployed to AWS. I preferred to use Apache Spark, given my personal and Humangeo’s positive experiences with it. In addition, we needed to develop a solution quickly, so naturally I turned to Python 3.4. It was already part of our tech stack and let’s be real, Python makes life easier (for the most part). Given these requirements, my quest to discover the best solution quickly led me to Amazon’s Elastic Map Reduce (EMR) service.

For those who aren’t familiar with EMR, it’s basically a scalable, Hadoop based Amazon web service that allows users to quickly and easily launch and manage preconfigured Elastic Compute Cloud EC2 instances. EMR also acts as an interface to the user selected open-sourced analytics frameworks installed on its clusters, making it quick and easy to manage. The icing on the cake was that EMR can be preconfigured to run Spark on launch, whose jobs can be written in Python. The process of creating my Spark jobs, setting up EMR, and running my jobs was a easy…until I hit a few major snags, mostly due to using Python 3.4. Whomp, whomp. Fortunately I was able to solve these problems. This blog post is dedicated to those who are encountering the same if not similar problems.

Obstacle 1: Installing Python dependencies on all nodes

The first obstacle that I had to overcome was that I needed to install the Python dependencies for the job. Since Spark is a distributed environment, each node of the EMR cluster needs to have all the dependencies required to run its jobs. This is usually handled by distributing the job’s *.jar-with-dependencies (i.e jobs written in Java). However, Python jobs don’t have a jar-with-dependencies so you have to work around this. Luckily, you can use EMR’s bootstrap scripting to do this. The bootstrap script is run on each node of your EMR cluster after it’s provisioned. It can be used to tweak the stock EMR environment to fit your application’s needs, including installing additional dependencies and modifying configuration. In my case I used the bootstrap script to install the Python dependencies system wide with pip. See the code snippet below.

#!/bin/bash
...

sudo pip-3.4 install -r https://s3.amazonaws.com/bucket/requirements.txt

....

This can also be accomplished by using spark-submit’s --pyfiles argument, where you can supply a zip of python dependencies, including .py files and .eggs. However, there seems to be a consensus that this approach is messy and unreliable.

Obstacle 2: Configuring EMR with Python 3.4

I found this blog post detailing how to run EMR with Python 3.4. It provides a JSON configuration that basically exports an environment variable that PySpark will use to determine the version of Python under which it will run. Note that this is sufficient for using PySpark on EMR, but spark-submit will require additional configuration.

 
[
    {
        "Classification": "spark-env",
        "Properties": {},
        "Configurations": [
            {
                "Classification": "export",
                "Properties": {
                    "PYSPARK_PYTHON": "python34"
                },
                "Configurations": []
            }
        ]
    }
]
actual

Obstacle 3: Copying job config to all nodes

Since the Spark job needs a configuration file, that file needs to be present on all of the nodes in the cluster. AWS CLI provides a command that will copy a file from one AWS location to another. In my case, I needed to copy a file from S3 to all of my EMR nodes. Therefore I placed the copy command in my bootstrap script. Since the bootstrap script is run on all nodes, the config file was copied from S3 to each node in the cluster.

aws s3 cp s3://<myBucket>/conf/myConf.yml /home/hadoop/

Obstacle 4: Getting data into Spark Job

This was straightforward. The job needs to read from a dump file which contains lines of JSON. I first uploaded the dump file, myFile.dump to an S3 bucket, myBucket. Then, I used urllib.request to read the file from S3 and convert it to a Spark object. Here’s the code snippet.

import urllib.request as urllib2
from pyspark import SparkContext
from pyspark.sql import HiveContext

.....

sc = SparkContext(appName="App")
sqlContext = HiveContext(sc) #Use Hive context for Windows functions

.....

    myurl = "https://<your_s3_path>/myBucket/myFile.dump"
    myfile = urllib2.urlopen(myurl)
    content = [line.decode('utf-8').rstrip('\n') for line in myfile]
    dict_list = [] # will hold json objects for dataframe 
    for line in content:
        json_line = json.loads(line) # convert to object
        dict_list.append(json_line)

    spark_df = sqlContext.createDataFrame(dict_list)

......

Obstacle 5: Submitting Python 3.4 Spark Jobs to EMR

This was by far the most time consuming of all the challenges. Before we go forward, let me give a brief explanation of how Spark jobs are submitted to EMR.

EMR runs Spark jobs by adding a “Step” to the EMR cluster. There are multiple steps from which we can choose. In my case, I chose the application type. When a Spark Application type is selected, the “Step” essentially becomes a thin wrapper around the spark-submit command. It provides users the option to specify spark-submit options, i.e. spark-submit command parameters. If no options are specified, EMR uses the default Spark configuration. Additionally, you must provide an application location In my case, the application location was a Python file on S3. There is an option to specify arguments for the application as well as an action to take upon failure of the application. Upon adding a step, the EMR cluster will attempt to use spark-submit to run the job. One important thing to note about Spark on EMR is that its executors run in Apache Hadoop YARN containers. If you’re more interested in how it works, I highly recommend this awesome post.

Python Probz

After going through the above process, I noticed that the steps kept failing. I used the EMR console to check the stderr logs and noticed that the jobs were being submitted but failing. Here’s an example of the error:

INFO Client: Application report for application(state: ACCEPTED)
INFO Client: Application report for application(state: ACCEPTED)
INFO Client: Application report for application (state: FAILED)
 04:58:03 INFO Client: 
     client token: N/A
     diagnostics: Application failed 2 times due to AM Container for appattempt exited with  exitCode: 1
For more detailed output, check application tracking page: click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: 
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
    at org.apache.hadoop.util.Shell.run(Shell.java:456)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 
     final status: FAILED
     tracking URL: 
     user: hadoop
Exception in thread "main" org.apache.spark.SparkException: Application application_ finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1034)
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1081)
    at org.apache.spark.deploy.yarn.Client.main(Client.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 INFO ShutdownHookManager: Shutdown hook called
 INFO ShutdownHookManager: Deleting directory 
Command exiting with ret '1'

Unfortunately, this isn’t very helpful, as many things could cause the spark submit to fail. So I ssh’d into the master EMR node to investigate. I decided to look at the YARN logs first, since the spark executors run inside YARN containers.

yarn logs -applicationId <application_id_in_emr_log>

I found this:

16/07/29 02:33:45 INFO ShutdownHookManager: Shutdown hook called
End of LogType:stderr

LogType:stdout
Log Contents:
Traceback (most recent call last):
  File "xxxx" line x, in <module>
    import urllib.request as urllib2
NameError: name 'request' is not defined
End of LogType:stdout

I’ll spare you the nitty-gritty details about urllib, but trust that this happens when you try to import urllib.request in versions of Python < 3. This meant Spark/EMR was actually using Python 2.7 despite providing configuration for it to use Python 3.4. I verified that the PySpark Shell was actually using Python 3.4. Then I noticed that the job, which is started via the spark-submit process, was running on Python 2.7. It made sense that this could be possible, since PySpark and spark-submit are two different processes. After some research, I found several sources that recommended adding the following to spark-env.sh:

export PYSPARK_PYTHON=python34
export PYSPARK_PYTHON_DRIVER=python34

or manually exporting them from the command line (in my case on each node).

I tried doing both and rerunning the cluster with no luck. I even went down the rabbit hole of trying to modify the PATH, creating aliases, etc. I even tried creating a virtualenv with Python 3.4 and activating it in my code to no avail.

The Solution

I then examined the YARN documentation and and observed these configuration options: spark.yarn.appMasterEnv and spark.executorEnv. Both seemed promising, so I decided to pass them into the Step config via the --conf parameter, like so:

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=python34 
--conf spark.executorEnv.PYSPARK_PYTHON=python34

Added the step. Cluster ran. Job completed. Discrete yet aggressive fist pump at my desk.

Random Helpful Tidbits

Important Locations on EMR nodes:

  • /usr/lib/spark/config - spark config
  • /usr/lib/hadoop/config - hadoop config
  • /var/log/ - logs

  • hadoop user - runs spark submit jobs

Commands:

yarn logs -applicationId <applicationID> # shows yarn logs for an application

Conclusion

Despite some small headaches, I think EMR is a great service that’s going to make life a lot easier for many. It’s a huge time saver because all user-selected configuration is done by the EMR service and it’s managed by Amazon. The user interfaces make it easy to stand up and configure a cluster. I only encountered some minor configuration troubles, mostly because of my specific use case. For my fellow Spark and Python enthusiasts, I hope this helps. For others, I hope you learned something. For both, thanks for reading, and stay tuned for a follow-up post focusing on the pattern analytics!