π ΠΠ°ΠΊ Π½Π°ΠΉΡΠΈ ΠΊΡΠ°ΡΡΠ°ΠΉΡΠΈΠΉ ΠΌΠ°ΡΡΡΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ Apache Spark ΠΈ GraphFrames
Π Π°Π·Π±ΠΈΡΠ°Π΅ΠΌ ΠΊΠ΅ΠΉΡ Π½Π° ΡΠ΅Π°Π»ΡΠ½ΡΡ
Π΄Π°Π½Π½ΡΡ
ΠΈΠ· OpenStreetMap β ΠΈΡΠ΅ΠΌ ΠΎΠΏΡΠΈΠΌΠ°Π»ΡΠ½ΡΠΉ ΠΌΠ°ΡΡΡΡΡ
π Π§ΡΠΎ Π΄Π΅Π»Π°Π΅ΠΌ
1. ΠΠ°Π³ΡΡΠΆΠ°Π΅ΠΌ Π³ΡΠ°Ρ Π΄ΠΎΡΠΎΠ³ Π³ΠΎΡΠΎΠ΄Π° Ρ ΠΏΠΎΠΌΠΎΡΡΡ OSMnx
2. Π‘ΠΎΡ
ΡΠ°Π½ΡΠ΅ΠΌ Π²Π΅ΡΡΠΈΠ½Ρ ΠΈ ΡΠ΅Π±ΡΠ° Ρ ΠΊΠΎΠΎΡΠ΄ΠΈΠ½Π°ΡΠ°ΠΌΠΈ, ΡΠΊΠΎΡΠΎΡΡΡΠΌΠΈ ΠΈ Π³Π΅ΠΎΠΌΠ΅ΡΡΠΈΠ΅ΠΉ
3. ΠΠ°Π³ΡΡΠΆΠ°Π΅ΠΌ Π²ΡΡ Π² Spark
4. ΠΠ°Ρ
ΠΎΠ΄ΠΈΠΌ ΠΊΡΠ°ΡΡΠ°ΠΉΡΠΈΠΉ ΠΏΡΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ GraphFrames
π 1. Π‘ΠΊΠ°ΡΠΈΠ²Π°Π΅ΠΌ ΠΊΠ°ΡΡΡ ΠΈ ΡΡΡΠΎΠΈΠΌ Π³ΡΠ°Ρ ΡΠ»ΠΈΡ
import osmnx as ox
# ΠΠ°Π³ΡΡΠ·ΠΊΠ° Π΄Π°Π½Π½ΡΡ ΠΎ Π΄ΠΎΡΠΎΠ³Π°Ρ ΠΠΎΡΠΊΠ²Ρ
G = ox.graph.graph_from_place("Moscow", network_type="drive")
# ΠΡΠΎΠ±ΡΠ°ΠΆΠ΅Π½ΠΈΠ΅ Π΄ΠΎΡΠΎΠ³ Π½Π° ΠΊΠ°ΡΡΠ΅
moscow_gdf = ox.geocoder.geocode_to_gdf("Moscow")
fig, ax = ox.plot.plot_graph(G, show=False, close=False, bgcolor="#111111", edge_color="#ffcb00", edge_linewidth=0.3, node_size=0)
moscow_gdf.plot(ax=ax, fc="#444444", ec=None, lw=1, alpha=1, zorder=-1)
# ΠΠ°ΡΡΡΠΎΠΉΠΊΠ° Π³ΡΠ°Π½ΠΈΡ ΠΊΠ°ΡΡΡ
margin = 0.02
west, south, east, north = moscow_gdf.union_all().bounds
margin_ns = (north - south) * margin
margin_ew = (east - west) * margin
ax.set_ylim((south - margin_ns, north + margin_ns))
ax.set_xlim((west - margin_ew, east + margin_ew))
plt.show()
π 2. Π‘ΠΎΡ ΡΠ°Π½ΡΠ΅ΠΌ Π³Π΅ΠΎΠΌΠ΅ΡΡΠΈΡΠ΅ΡΠΊΠΎΠ΅ ΠΎΠΏΠΈΡΠ°Π½ΠΈΠ΅ Π³ΠΎΡΠΎΠ΄Π° Π² ΡΠΎΡΠΌΠ°ΡΠ΅ GeoJSON ΠΈ Π΄Π°Π½Π½ΡΠ΅ ΠΎ Π²Π΅ΡΡΠΈΠ½Π°Ρ ΠΈ ΡΡΠ±ΡΠ°Ρ Π² ΡΠΎΡΠΌΠ°ΡΠ΅ CSV
with open('Moscow.geojson', 'w') as file:
file.write(moscow_gdf.to_json())
nodes = G.nodes(data=True)
with open('nodes.csv', 'a') as file:
file.write("id,lat,lonn")
for (node, data) in nodes:
file.write("%d,%f,%fn" % (node, data.get("y"), data.get("x")))
edges = G.edges(data=True)
def decode_maxspeed(maxspeed):
match maxspeed:
case str():
match maxspeed.lower():
case "ru:urban": return 60
case "ru:rural": return 90
case "ru:living_street": return 20
case "ru:motorway": return 110
case _: return int(maxspeed)
case list(): return min(list(map(decode_maxspeed, maxspeed)))
case _: return maxspeed
with open('edges.csv', 'a') as file:
file.write("src,dst,maxspeed,length,geometryn")
for (src, dst, data) in edges:
maxspeed = decode_maxspeed(data.get("maxspeed", 999))
length = float(data.get("length"))
geometry = shapely.wkt.dumps(data.get("geometry"))
file.write("%d,%d,%d,%f,%sn" % (src, dst, maxspeed, length, geometry))
3. ΠΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΡ GraphFrames Π΄Π»Ρ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΠΈ Π³ΡΠ°ΡΠΎΠ² Π½Π° Apache Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder
.config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12")
.master("local[*]")
.appName("GraphFrames")
.getOrCreate()
nodes = spark.read.options(header=True).csv("nodes.csv")
edges = spark.read.options(header=True).csv("edges.csv")
# ΠΡΡΠΈΡΠ»Π΅Π½ΠΈΠ΅ Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ ΠΏΡΠΎΡ ΠΎΠΆΠ΄Π΅Π½ΠΈΡ ΡΡΠ±Π΅Ρ
edgesT = edges.withColumn("time", edges["length"] / edges["maxspeed"])
# ΠΠΎΡΡΡΠΎΠ΅Π½ΠΈΠ΅ Π³ΡΠ°ΡΠ°
from graphframes import *
g = GraphFrame(nodes, edgesT)
π§ 4. ΠΡΠ΅ΠΌ ΠΊΡΠ°ΡΡΠ°ΠΉΡΠΈΠΉ ΠΏΡΡΡ ΠΏΠΎ Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ
Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΠΎΡ ΠΠ·ΠΌΠ°ΠΉΠ»ΠΎΠ²ΠΎ Π΄ΠΎ ΠΠ ΠΠΈΠ»Π°ΡΡ
src = "257601812"
dst = "5840593081"
paths = g.shortestPaths(landmarks=[dst])
paths.filter(F.col("id") == src).show(truncate=False)
π‘ Π Π΅Π·ΡΠ»ΡΡΠ°Ρ: 40 ΡΠ°Π³ΠΎΠ² ΠΎΡ ΡΠΎΡΠΊΠΈ A Π΄ΠΎ ΡΠΎΡΠΊΠΈ B.
Π’Π°ΠΊΠΎΠΉ ΠΏΠΎΠ΄Ρ ΠΎΠ΄ Π»Π΅Π³ΠΊΠΎ ΠΌΠ°ΡΡΡΠ°Π±ΠΈΡΡΠ΅ΡΡΡ Π½Π° ΠΌΠΈΠ»Π»ΠΈΠΎΠ½Ρ ΠΌΠ°ΡΡΡΡΡΠΎΠ². ΠΡΠΏΠΎΠ»ΡΠ·ΡΠΉΡΠ΅ Spark ΠΈ GraphFrames Π΄Π»Ρ ΠΏΠΎΡΡΡΠΎΠ΅Π½ΠΈΡ Π»ΠΎΠ³ΠΈΡΡΠΈΡΠ΅ΡΠΊΠΈΡ ΠΌΠΎΠ΄Π΅Π»Π΅ΠΉ, ΠΌΠ°ΡΡΡΡΡΠΈΠ·Π°ΡΠΈΠΈ ΠΈ Π³ΠΎΡΠΎΠ΄ΡΠΊΠΎΠ³ΠΎ ΠΏΠ»Π°Π½ΠΈΡΠΎΠ²Π°Π½ΠΈΡ.
π Π₯ΠΎΡΠΈΡΠ΅ ΠΏΡΠΎΠΊΠ°ΡΠ°ΡΡΡΡ Π² ΡΠ°Π±ΠΎΡΠ΅ Ρ Big Data? ΠΠ·ΡΡΠ°ΠΉΡΠ΅ Spark! ΠΠ°ΠΏΠΈΡΡΠ²Π°ΠΉΡΠ΅ΡΡ Π½Π° ΠΊΡΡΡ Spark Developer ΠΎΡ OTUS β ΡΡΠΈΡΠ΅ΡΡ Π½Π° ΡΠ΅Π°Π»ΡΠ½ΡΡ Π΄Π°Π½Π½ΡΡ ΠΈ ΠΏΡΠΎΠ΄Π²ΠΈΠ½ΡΡΡΡ ΠΊΠ΅ΠΉΡΠ°Ρ : https://vk.cc/cMT1Wg
Π Π΅ΠΊΠ»Π°ΠΌΠ°. ΠΠΠ Β«ΠΡΡΡ ΠΎΠ½Π»Π°ΠΉΠ½-ΠΎΠ±ΡΠ°Π·ΠΎΠ²Π°Π½ΠΈΠ΅Β», ΠΠΠ Π 1177746618576
>>Click here to continue<<