Using Python and Spark to research the Climate Change, Part 1 | by Kaya Kupferschmidt | Dec, 2020

[ad_1]


2.1 Hardware Requirements

2.2 Software Requirements

conda env create -n weather -f environment.yml
conda activate weather
jupyter-lab --ip=0.0.0.0 --port=8888

3.1 Choosing a Data Source

Weather stations map — source NOAA
The number of weather stations per year — source: NOAA.

3.2 Downloading the Data

4.1 Raw Data Organization

4.2 Converting Master Data

weather_stations = spark.read 
.option("header", True)
.csv(weather_basedir + "/isd-history.csv")
weather_stations.limit(10).toPandas()
weather_stations = weather_stations 
.withColumnRenamed("STATION NAME", "STATION_NAME")
.withColumnRenamed("ELEV(M)", "ELEVATION")
weather_stations.write.mode("overwrite").parquet(stations_location)

4.3 Extracting Measurement Data

raw_weather = spark.read.text(weather_basedir + "/2013")
raw_weather.limit(10).toPandas()
Raw weather measurement data
raw_weather.select(
f.substring(raw_weather["value"],5,6).alias("usaf"),
f.substring(raw_weather["value"],11,5).alias("wban"),
f.to_timestamp(
f.substring(raw_weather["value"],16,12),
"yyyyMMddHHmm").alias("ts"),
(f.substring(raw_weather["value"],88,5).cast("float") /
10.0).alias("air_temperature"),
f.substring(raw_weather["value"],93,1)
.alias("air_temperature_qual")
)
.withColumn("date", f.to_date(f.col("ts")))
.limit(10).toPandas()
def extract_weather_measurements(raw_weather):
df = raw_weather.select(
raw_weather["value"],
f.substring(raw_weather["value"],5,6).alias("usaf"),
f.substring(raw_weather["value"],11,5).alias("wban"),
# More code is omitted here
)
df = extract_precipitation(df, 1, 109)
df = extract_precipitation(df, 2, 120)
df = extract_precipitation(df, 3, 131)
df = extract_precipitation(df, 4, 142)
return df.drop("value")

4.4 Processing the full History

# Set this to `True` if you want to force reprocessing
force = False
for i in range(1901,2021):
source_dir = os.path.join(weather_basedir, str(i))
target_dir = os.path.join(hourly_weather_location,
"year=" + str(i))
if force or not path_exists(target_dir):
print(f"Processing year {i} to {target_dir}")
# Read in raw data
raw_weather = spark.read.text(source_dir)
# extract all measurments
weather = extract_weather_measurements(raw_weather)
# Repartition (i.e. shuffle) the data.
# This will ensure that all output files have a similar size
weather = weather.repartition(32)
# Write results as Parquet files
weather.write.mode("overwrite").parquet(target_dir)
else:
print(f"Skipping year {i} in {target_dir}")

Read More …

[ad_2]


Write a comment