One of the really nice things about spark is the ability to read input files of different formats right out of the box. Though this is a nice to have feature, reading files in spark is not always consistent and seems to keep changing with different spark releases. This article will show you how to read files in
csv
and json
to compute word counts on selected fields. This example assumes that you would be using spark 2.0+ with python 3.0 and above. Full working code can be found in this repository.Data filesTo illustrate by example let's make some assumptions about data files. Let's assume that we have data files containing a
title
field and a corresponding text
field. The toy example format in json is as follows:{"title":"Data","text":"Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing."} {"title":"Big Data","text":"Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy."}And, the format in csv is as follows:
"title","text" "Data","Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing." "Big Data","Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy."Assume that we want to compute word counts based on the
text
field.Reading JSON FileReading the json file is actually pretty straightforward, first you create an SQLContext from the spark context. This gives you the capability of querying the json file in regular SQL type syntax.
# Create an sql context so that we can query data files in sql like syntax sqlContext = SQLContext (sparkcontext)In this next step, you use the sqlContext to read the json file and select only the `text` field. Remember that we have two fields,
title
and text
and in this case we are only going to process the text
field. This step returns a spark data frame where each entry is a Row object. In order to access the text
field in each row, you would have to use row.text
.# read the json data file and select only the field labeled as "text" # this returns a spark data frame df = sqlContext.read.json ("json_datafile").select("text")To view what you have just read, you can use
df.show()
# just for the heck of it, show 2 results without truncating the fields df.show (2, False)You should see something like this:
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |text | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |Data (/ˈdeɪtə/ DAY-tə, /ˈdætə/ DA-tə, or /ˈdɑːtə/ DAH-tə)[1] is a set of values of qualitative or quantitative variables. An example of qualitative data is an anthropologist's handwritten note about his or her interviews with indigenous people. Pieces of data are individual pieces of information. While the concept of data is commonly associated with scientific research, data is collected by a huge range of organizations and institutions, including businesses (e.g., sales data, revenue, profits, stock price), governments (e.g., crime rates, unemployment rates, literacy rates) and non-governmental organizations (e.g., censuses of the number of homeless people by non-profit organizations).Data is measured, collected and reported, and analyzed, whereupon it can be visualized using graphs, images or other analysis tools. Data as a general concept refers to the fact that some existing information or knowledge is represented or coded in some form suitable for better usage or processing.| |Big data is a term for data sets that are so large or complex that traditional data processing application software is inadequate to deal with them. Big data challenges include capturing data, data storage, data analysis, search, sharing, transfer, visualization, querying, updating and information privacy. | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Reading CSV FileReading the csv file is similar to json, with a small twist to it, you would use `sqlContext.read.load(...)` and provide a format to it as below. Note that this method of reading is also applicable to different file types including `json`, `parquet` and `csv` and probably others as well.
# Create an sql context so that we can query data files in sql like syntax sqlContext = SQLContext (sparkcontext)Since the csv data file in this example has a header row, this can be used to infer schema and thus `header='true'` as seen above. In this example, we are again selecting only the `text` field. This method of reading a file also returns a data frame identical to the previous example on reading a json file.# read the CSV data file and select only the field labeled as "text" # this returns a spark data frame df = sqlContext.read.load ("csv_file", format='com.databricks.spark.csv', header='true', inferSchema='true').select("text")
Generating Word CountsNow that we know that reading the csv file or the json file returns identical data frames, we can use a single method to compute the word counts on the `text` field. The idea here is to break words into tokens for each row entry in the data frame, and return a count of 1 for each token (line 4). This function returns a list of lists where each internal list contains just the word and a count of 1 (`[w, 1]`). The tokenized words would serve as the key and the corresponding count would be the value. Then when you reduce by key, you can add up all counts on a per word (key) basis to get total counts for each word (see line 8). Note that `add` here is a python function from the operator module.
# for each text entry, get it into tokens and assign a count of 1 # we need to use flat map because we are going from 1 entry to many mapped_rdd = df.rdd.flatMap (lambda row: get_keyval (row)) # for each identical token (i.e. key) add the counts # this gets the counts of each word counts_rdd = mapped_rdd.reduceByKey (add) # get the final output into a list word_count = counts_rdd.collect ()
As you can see below, accessing the
text
field is pretty simple if you are dealing with data frames.def get_keyval(row): # get the text from the row entry text=row.text #lower case text and split by space to get the words words=text.lower().split(" ") #for each word, send back a count of 1 #send a list of lists return [[w, 1] for w in words]
And whoala, now you know how to read files with pyspark and use it for some basic processing! For the full source code please see links below.
Reposting from kavita-ganesan.com
Reposting from kavita-ganesan.com
No comments:
Post a Comment