import sys from awsglue.utils import getResolvedOptions from awsglue.context import GlueContext from awsglue.job import Job from awsglue.transforms import SelectFields from awsglue.transforms import RenameField from awsglue.dynamicframe import DynamicFrame, DynamicFrameReader, DynamicFrameWriter, DynamicFrameCollection from pyspark.context import SparkContext from pyspark.ml.regression import LinearRegression from pyspark.ml.feature import VectorAssembler from pyspark.ml.regression import LinearRegression from pyspark.ml.clustering import KMeans args = getResolvedOptions(sys.argv, ['JOB_NAME']) #JOB INPUT DATA destination = "s3://luiscarosnaprds/gluescripts/results/ClusterResults3.parquet" namespace = "nyc-transportation-version2" tablename = "green" sc = SparkContext() glueContext = GlueContext(sc) job = Job(glueContext) job.init(args['JOB_NAME'], args) #Load table and select fields datasource0 = glueContext.create_dynamic_frame.from_catalog(name_space = namespace, table_name = tablename) SelectFields0 = SelectFields.apply(frame = datasource0, paths=["trip_distance","fare_amount","pickup_longitude","pickup_latitude" ]) DataFrame0 = DynamicFrame.toDF(SelectFields0) #Filter some unwanted values DataFrameFiltered = DataFrame0.filter("pickup_latitude > 40.472278 AND pickup_latitude < 41.160886 AND pickup_longitude > -74.300074 AND pickup_longitude < -71.844077") #Select features and convert to SparkML required format features = ["pickup_longitude","pickup_latitude"] assembler = VectorAssembler(inputCols=features,outputCol='features') assembled_df = assembler.transform(DataFrameFiltered) #Fit and Run Kmeans kmeans = KMeans(k=100, seed=1) model = kmeans.fit(assembled_df) transformed = model.transform(assembled_df) #Save data to destination transformed.write.mode('overwrite').parquet(destination) job.commit()