 DylanMao

V1

2023/03/07阅读：28主题：红绯

# 基于ray 多进程调度管理能力优化networks节点最短路径的并行计算

``def f_one():    degree_centrality_result = betweenness_centrality(G)    out = []    for i, j in degree_centrality_result.items():        out.append({"user_id": i, "weight": j})    pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")# 读取2020年企业年报因果关系数据集。bracket_name = json.load(open("2020_extract_result_center.json", "r"))graph = {}for i in tqdm(list(bracket_name)):    if (i, i) not in graph:        graph[(i, i)] = 1    else:        graph[(i, i)] += 1print(len(graph))graph_relation = []for graph_id, graph_count in graph.items():    if graph_count > 1:        graph_relation.append(graph_id)print(len(graph_relation))graph_relation = graph_relation[:10000]G = nx.Graph()G.add_edges_from(graph_relation)  # 添加多条边ray_G = ray.put(G)f_one()``

``def betweenness_centrality(        G, k=None, normalized=True, weight=None, endpoints=False, seed=None):    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G    if k is None:        nodes = G    else:        nodes = seed.sample(list(G.nodes()), k)    ray_betweenness = [shortest_path_basic.remote(s, ray_betweenness) for s in nodes]    betweenness = ray.get(ray_betweenness)    betweenness_compute = {}    for betweenness_one in betweenness:        for i,j in betweenness_one.items():            if i in betweenness_compute:                betweenness_compute[i]+=j            else:                betweenness_compute[i]=j    betweenness = _rescale(        betweenness_compute,        len(G),        normalized=normalized,        directed=G.is_directed(),        k=k,        endpoints=endpoints,    )    return betweenness``

``@ray.remotedef shortest_path_basic(s, ray_betweenness=None):    G = ray.get(ray_G)    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G    # single source shortest paths    S, P, sigma, _ = _single_source_shortest_path_basic(G, s)    # accumulation    betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s)#     ray_betweenness = ray.put(betweenness)    del G    return betweenness``

``import jsonimport networkx as nximport pandas as pdfrom collections import dequefrom networkx.algorithms.shortest_paths.weighted import _weight_functionfrom tqdm import tqdm# helpers for betweenness centralitydef _single_source_shortest_path_basic(G, s):    S = []    P = {}    for v in G:        P[v] = []    sigma = dict.fromkeys(G, 0.0)  # sigma[v]=0 for v in G    D = {}    sigma[s] = 1.0    D[s] = 0    Q = deque([s])    while Q:  # use BFS to find shortest paths        v = Q.popleft()        S.append(v)        Dv = D[v]        sigmav = sigma[v]        for w in G[v]:            if w not in D:                Q.append(w)                D[w] = Dv + 1            if D[w] == Dv + 1:  # this is a shortest path, count paths                sigma[w] += sigmav                P[w].append(v)  # predecessors    return S, P, sigma, Ddef _accumulate_basic(betweenness, S, P, sigma, s):    delta = dict.fromkeys(S, 0)    while S:        w = S.pop()        coeff = (1 + delta[w]) / sigma[w]        for v in P[w]:            delta[v] += sigma[v] * coeff        if w != s:            betweenness[w] += delta[w]    return betweenness, deltadef _accumulate_endpoints(betweenness, S, P, sigma, s):    betweenness[s] += len(S) - 1    delta = dict.fromkeys(S, 0)    while S:        w = S.pop()        coeff = (1 + delta[w]) / sigma[w]        for v in P[w]:            delta[v] += sigma[v] * coeff        if w != s:            betweenness[w] += delta[w] + 1    return betweenness, deltadef _accumulate_edges(betweenness, S, P, sigma, s):    delta = dict.fromkeys(S, 0)    while S:        w = S.pop()        coeff = (1 + delta[w]) / sigma[w]        for v in P[w]:            c = sigma[v] * coeff            if (v, w) not in betweenness:                betweenness[(w, v)] += c            else:                betweenness[(v, w)] += c            delta[v] += c        if w != s:            betweenness[w] += delta[w]    return betweennessdef _rescale(betweenness, n, normalized, directed=False, k=None, endpoints=False):    if normalized:        if endpoints:            if n < 2:                scale = None  # no normalization            else:                # Scale factor should include endpoint nodes                scale = 1 / (n * (n - 1))        elif n <= 2:            scale = None  # no normalization b=0 for all nodes        else:            scale = 1 / ((n - 1) * (n - 2))    else:  # rescale by 2 for undirected graphs        if not directed:            scale = 0.5        else:            scale = None    if scale is not None:        if k is not None:            scale = scale * n / k        for v in betweenness:            betweenness[v] *= scale    return betweennessdef _rescale_e(betweenness, n, normalized, directed=False, k=None):    if normalized:        if n <= 1:            scale = None  # no normalization b=0 for all nodes        else:            scale = 1 / (n * (n - 1))    else:  # rescale by 2 for undirected graphs        if not directed:            scale = 0.5        else:            scale = None    if scale is not None:        if k is not None:            scale = scale * n / k        for v in betweenness:            betweenness[v] *= scale    return betweennessimport rayray.init()@ray.remotedef shortest_path_basic(s, ray_betweenness=None):    G = ray.get(ray_G)    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G    # single source shortest paths    S, P, sigma, _ = _single_source_shortest_path_basic(G, s)    # accumulation    betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s)#     ray_betweenness = ray.put(betweenness)    del G    return betweennessdef betweenness_centrality(        G, k=None, normalized=True, weight=None, endpoints=False, seed=None):    betweenness = dict.fromkeys(G, 0.0)  # b[v]=0 for v in G    if k is None:        nodes = G    else:        nodes = seed.sample(list(G.nodes()), k)    ray_betweenness = ray.put(betweenness)    ray_betweenness = [shortest_path_basic.remote(s, ray_betweenness) for s in nodes]    betweenness = ray.get(ray_betweenness)#     print(betweenness[:3])    # rescaling#     json.dump(betweenness, open("betweenness.json", "w", encoding="utf-8"), ensure_ascii=False)    betweenness_compute = {}    for betweenness_one in betweenness:        for i,j in betweenness_one.items():            if i in betweenness_compute:                betweenness_compute[i]+=j            else:                betweenness_compute[i]=j    betweenness = _rescale(        betweenness_compute,        len(G),        normalized=normalized,        directed=G.is_directed(),        k=k,        endpoints=endpoints,    )    return betweenness@ray.remotedef f(index):    bracket_name = ray.get(bracket_name_ray)    graph = {}    print(index)    for i in tqdm(list(bracket_name)):        if (i, i) not in graph:            graph[(i, i)] = 1        else:            graph[(i, i)] += 1    print(len(graph))    graph_relation = []    for graph_id, graph_count in graph.items():        if graph_count > 1:            graph_relation.append(graph_id)    G = nx.Graph()    G.add_edges_from(graph_relation)  # 添加多条边    degree_centrality_result = betweenness_centrality(G)    out = []    for i, j in degree_centrality_result.items():        out.append({"user_id": i, "weight": j})    pd.DataFrame(out).to_csv("betweenness_centrality_result" + str(index) + ".csv")def f_one():    degree_centrality_result = betweenness_centrality(G)    out = []    for i, j in degree_centrality_result.items():        out.append({"user_id": i, "weight": j})    pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")bracket_name = json.load(open("2020_extract_result_center.json", "r"))bracket_name_ray = ray.put(bracket_name)# futures = [f.remote(i) for i in range(len(bracket_name) // 3000)]# print(ray.get(futures))  # [0, 1, 4, 9]bracket_name = ray.get(bracket_name_ray)graph = {}for i in tqdm(list(bracket_name)):    if (i, i) not in graph:        graph[(i, i)] = 1    else:        graph[(i, i)] += 1print(len(graph))graph_relation = []for graph_id, graph_count in graph.items():    if graph_count > 1:        graph_relation.append(graph_id)print(len(graph_relation))graph_relation = graph_relation[:10000]G = nx.Graph()G.add_edges_from(graph_relation)  # 添加多条边ray_G = ray.put(G)f_one()`` V1