bdcs

How to run and monitoring process spark on BDCS

We are going to make spark application using spark-submit and also how  to monitor job.

$ su - hdfs 
$ Create Pyspark application
$ spark-sales.py
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

sparkMaster = "spark://spark-master:7077"
hiveMetastore = "thrift://localhost:9083"

def create_spark_session(app_name="Sales_Gold_Application"):
    spark_session = SparkSession
    .builder
    .master("local")
    .appName(app_name)
    .enableHiveSupport()
    .getOrCreate()

    spark_session.conf.set("spark.sql.shuffle.partitions", 6)
    #spark_session.conf.set("spark.driver.memory", "1g")
    spark_session.conf.set("spark.executor.memory", "64g")
    spark_session.conf.set("spark.executor.cores", "16")
    spark_session.conf.set("hive.exec.dynamic.partition", "true")
    spark_session.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    spark_session.conf.set("hive.enforce.bucketing", "true")
    spark_session.conf.set("hive.metastore.uris", hiveMetastore)

    spark_session.sparkContext.setLogLevel("WARN")
    return spark_session

def main():
    # ss = SparkSession
    SS = create_spark_session()

    SS.sql("USE bigdata_prd")

    # Select All Data and Catche
    sql = """  SELECT  SITE            ,
                        ART_CODE        ,
                        ART_DESC        ,
                        SUBCLASS        ,
                        BRAND_CODE      ,
                        BRAND_DESC      ,
                        PRINC_CODE      ,
                        PRINC_DESC      ,
                        PRIV_LABEL      ,
                        ART_TYPE        ,
                        SALES           ,
                        COGS            ,
                        PPN             ,
                        QTY             ,
                        STORE_CODE      ,
                        DATE_TRN    
                 FROM  fs_sales_gold
            """

    df_select = SS.sql(sql).cache()
    df_select.createOrReplaceTempView("fs_sales_gold_temp")

    sql = """ INSERT INTO  fact_sales_gold partition (STORE_CODE,DATE_TRN)   
                SELECT  SITE            ,
                        ART_CODE        ,
                        ART_DESC        ,
                        SUBCLASS        ,
                        BRAND_CODE      ,
                        BRAND_DESC      ,
                        PRINC_CODE      ,
                        PRINC_DESC      ,
                        PRIV_LABEL      ,
                        ART_TYPE        ,
                        SALES           ,
                        COGS            ,
                        PPN             ,
                        QTY             ,
                        STORE_CODE      ,
                        DATE_TRN    
                 FROM  fs_sales_gold_temp
            """

    df_select = SS.sql(sql)
    df_select.show(5)

if __name__ == '__main__':
    main()

Run Spark Submit

$ su - hdfs 
$ spark-submit spark-sales.py

 

Monitoring Spark job

We can monitor spark job from BDCS using
SPARK-UI. = http://129.154.85.xxx:4041/environment/
Note: Spark-UI can access before a job finish.

Spark History UI = http://129.154.85.xxx:18080/

spark-history

Run Spark History on BDCS

Connect to BDCS

# Use hdfs not root to start spark-history-server.sh
$ su - hdfs
# Check hdfs
$ hdfs dfs -ls /
$ cd $SPARK_HOME
$ cd /sbin/
$ ./start-history-server.sh status
$ ./start-history-server.sh start
$ ./spark-history-server.sh stop

spark-history-ambari

Configuring Apache Spark History Server

Spark History server

Check that $SPARK_HOME/conf/spark-defaults.conf has History Server properties set

1
2
3
4
5
6
7
8
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
spark.history.fs.logDirectory hdfs:///spark-history
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.ui.port 18080
 
spark.history.kerberos.keytab none
spark.history.kerberos.principal none

Create spark-history directory in HDFS

1
sudo -u hdfs hadoop fs -mkdir /spark-history

Change the owner of the directory

1
sudo -u hdfs hadoop fs -chown spark:hdfs /spark-history

Change permission (be more restrictive if necessary)

1
sudo -u hdfs hadoop fs -chmod 777 /spark-history

Add user spark to group hdfs on the instance where Spark History Server is going to run

1
sudo usermod -a -G hdfs spark

To view Spark jobs from other users
When you open the History Server and you are not able to see Spark jobs you are expecting to see, check the Spark out file in the Spark log directory. If error message “Permission denied” is present, Spark History Server is trying to read the job log file, but has no permission to do so.
Spark user should be added to the group of the spark job owner.
For example, user marko belongs to a group employee. If marko starts a Spark job, the log file for this job will have user and group marko:employee. In order for spark to be able to read the log file, spark user should e added to the employee group. This is done in the following way

1
sudo usermod -a -G employee spark

Checking spark’s groups

1
groups spark

should return group employee among spark’s groups.

Start Spark History server

1
sudo -u spark $SPARK_HOME/sbin/start-history-server.sh

Conclusion

We already know how to code PySpark on Jupiter,  As you can see,  Jupiter Notebook is a popular application that enabled you to create,  edit, run and shared python code into the web view. It allows you to modify and re-execute part of your code in the very flexible way.
That's why Jupiter is a great tool to test and prototype a program.

Happy learning guys!