Blogging Community Pig and Python | In Industry 4.0 | Python | By Mansoor Ahmed

Pig and Python

Introduction

Pig and Python are very widespread systems for executing complex Hadoop map-reduce-based data-flows. It enhances a layer of abstraction on top of Hadoop’s map-reduce mechanisms. That is with the intention of permitting developers to take a high-level view of the data and operations on that data.

Pig enables us to do things more openly. For instance, we may join two or more data sources. Writing a join as a map and reduce function is a bit of a drag and it’s commonly value avoiding. Therefore, Pig is great as it makes simpler multifaceted tasks. It offers a high-level scripting language that permits users to take more of a big-picture view of their data flow.

Pig is particularly inordinate as it is extensible. This article will emphasize its extensibility. At the end of this article, we will be able to write PigLatin scripts that execute Python code as a part of a larger map-reduce workflow.

Description

Pig is composed of two main parts:

  • A high-level data-flow language is called Pig Latin.
  • An engine that analyses improves, and performs the Pig Latin scripts as a series of MapReduce jobs that are run on a Hadoop cluster.

Pig is at ease to write, comprehend, and maintain as it is a data transformation language that enables the processing of data to be described as a sequence of transformations. It is similarly highly extensible through the use of the User Defined Functions (UDFs).

User-Defined Functions (UDFs)

  • A Pig UDF permits custom processing to be written in many languages, for example, Python.
  • It is a function that is nearby to Pig. On the other hand, it is written in a language that isn’t PigLatin.
  • Pig permits us to register UDFs for use within a PigLatin script.
  • A UDF requires to fit a precise prototype
  • An instance of a Pig application is the Extract, Transform, Load (ETL) process.
  • That defines how an application extracts data from a data source, changes the data for querying and examination drives.
  • It also loads the result onto a target data store.
  • When Pig loads the data, it may execute projections, iterations, and other transformations.
  • UDFs allow more multifaceted algorithms to be useful during the change phase.
  • It may be stored back in HDFS after the data is done being processed by Pig.

PigLatin scripts

We can write the simplest Python UDF as;

from pig_util import outputSchema
@outputSchema('word:chararray')
def hi_world():
    return "hello world"

The data output from a function has a particular form. Pig likes it if we require the schema of the data as then it distinguishes what it may do with that data. That’s what the output_schema decorator is for. There are a couple of diverse means to state a schema. If that were saved in a file named my_udfs.py we will be able to make use of it in a PigLatin script as;

-- first register it to make it available
REGISTER 'myudf.py' using jython as my_special_udfs
users = LOAD 'user_data' AS (name: chararray);
hello_users = FOREACH users GENERATE name, my_special_udfs.hi_world();

UDF arguments

UDF has inputs and outputs as well. Look at the below some UDFs:

def deal_with_a_string(s1):
    return s1 + " for the win!"
def deal_with_two_strings(s1,s2):
    return s1 + " " + s2
def square_a_number(i):
    return i*i  
def now_for_a_bag(lBag):
    lOut = []
    for i,l in enumerate(lBag):
        lNew = [i,] + l
        lOut.append(lNew)
    return lOut

The following are UDFs in a PigLatin script:

REGISTER 'myudf.py' using jython as myudfs
users = LOAD 'user_data' AS (firstname: chararray, lastname:chararray,some_integer:int);
winning_users    = FOREACH users GENERATE myudfs.deal_with_a_string(firstname);
full_names       = FOREACH users GENERATE myudfs.deal_with_two_strings(firstname,lastname);
squared_integers = FOREACH users GENERATE myudfs.square_a_number(some_integer);
users_by_number = GROUP users by some_integer;
indexed_users_by_number = FOREACH users_by_number GENERATE group,myudfs.now_for_a_bag(users);

Outside Standard Python UDFs

We can’t use NumPy from Jython. Moreover, Pig doesn’t actually permit Python Filter UDFs. We may only do stuff as:

user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
--add a field that says whether it is naughty (1) or not (0)
messages_with_rudeness = FOREACH user_messages GENERATE name,message,contains_naughty_words(message) as naughty;    
--then filter by the naughty field
filtered_messages = FILTER messages_with_rudeness by (naughty==1);   
-- and finally strip away the naughty field                 
rude_messages = FOREACH filtered_messages GENERATE name,message;

Python Streaming UDFs

Pig enables us to look into the Hadoop Streaming API. This allows us to get around the Jython issue when we require it to. Hadoop lets us write mappers and reducers in any language that provides us access to stdin and stdout. Therefore, that’s attractive much any language we want. Similar to Python 3 or even Cow.

The following is a simple Python streaming script, let’s call it simple_stream.py:

#! /usr/bin/env python
import sys
import string
for line in sys.stdin:
    if len(line) == 0: continue  
    l = line.split()    #split the line by whitespace
    for i,s in enumerate(l):
        print "{key}\t{value}\n".format(key=i,value=s) # give out a key value pair for each word in the line

The purpose is to develop Hadoop to run the script on each node. The hashbang line (#!) requires to be valid on every node. Each import statement must be valid on every node. Also, any system-level files or resources accessed inside the Python script must be accessible in the same way on every node.

Use with simple_stream script
DEFINE stream_alias 'simple_stream.py' SHIP('simple_stream.py');
user_messages = LOAD 'user_twits' AS (name:chararray, message:chararray);
just_messages = FOREACH user_messages generate message;
streamed = STREAM just_messages THROUGH stream_alias;
DUMP streamed;
The over-all format we are using is:
DEFINE alias 'command' SHIP('files');

The alias is the name used to access the streaming function from inside the PigLatin script. The command is the system command Pig would call when it is essential to use the streaming function. Finally, SHIP tells Pig those files and dependencies Pig desires to distribute to the Hadoop nodes for the command to be able to work.