Parsing the data
NycTaxiCsvParser
takes in the data from NycTaxiDataReader
. It simply splits each line by a comma and outputs Map<String, String>
containing individual fields. This is the definition of the input port of the NycTaxiCsvParser
operator:
public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() { @Override public void process(String tuple) { String[] values = tuple.split(","); Map<String, String> outputTuple = new HashMap<>(); if (values.length > 18 && StringUtils.isNumeric(values[0])) { outputTuple.put("pickup_time", values[1]); outputTuple.put("pickup_lon", values[5]); outputTuple.put("pickup_lat", values[6]); outputTuple.put("total_fare", values[18]); output.emit(outputTuple); } } };
As mentioned previously, we are only interested in the pickup time (Key pickup time
), pickup lat-lon coordinate (Keys pickup lon
and pickup lat
), and total payment (Key...