from pyspark.sql import Row;
import random
from datetime import datetime
#
# TEST Data generation
#
df_a = []
for i in range(0,10):
df_a.append(
(
i,
"ABC %s"%(round(i*random.random(),2)),
round(i*random.random(),2),
datetime.now()
)
)
df = spark.createDataFrame(df_a,['id','values','valuef','valued'])
df.show()
df.printSchema()
#
# The convertor
#
# Important: make this var as broadcast
dfSchema = sc.broadcast(df.schema)
# The function
def convertFieldsToByteArray(row):
# get from broadcast
schema = dfSchema.value
# convertion funcs
def bigint2bc(data):
return bytearray(struct.pack('>Q',data))
def string2bc(data):
return bytearray(struct.pack('s',str(data)))
def double2bc(data):
return bytearray(struct.pack('d',data))
def timestamp2bc(data):
return bytearray(struct.pack('>i',int(d.toordinal())))
def errorNotfound(n):
raise Exception(
'Cannot handle "{}" datatype, no handler'.format(n))
# mapping type to convertion func
convertors = {
'bigint': bigint2bc,
'string': string2bc,
'double': double2bc,
'timestamp': timestamp2bc
}
# return object/dict
o = {}
# convert fields
for field in schema:
o[field.name] = convertors[field.dataType.simpleString()](row[field.name]) if convertors.has_key(field.dataType.simpleString()) else errorNotfound(field.dataType.simpleString() )
# return row
return Row(**o)
# map with function to dataframe
df3 = df.rdd.map(convertFieldsToByteArray).toDF()
#
# Test the data
#
df3.show()
df3.printSchema()
Back to top