from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.session import SparkSession # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCountML") ssc = StreamingContext(sc, 1) spark = SparkSession(sc) # Prepare training documents from a list of (id, text, label) tuples. training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) model.save("./tests/ml/pipeline")
from pyspark.ml import Pipeline, PipelineModel from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql.session import SparkSession # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCountML_load") ssc = StreamingContext(sc, 1) spark = SparkSession(sc) model = PipelineModel.load("./tests/ml/pipeline") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) # Make predictions on test documents and print columns of interest. prediction = model.transform(test) selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): rid, text, prob, prediction = row print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))