Cypher(6) neo4j python driver

Create basic nodes and relationships

Posting information

  • Posting Date : 29-07-2024
  • Last Edit : 29-07-2024
  • Writer : KWON Bongjae

Environment information

  • Hardware : MacBookPro8,2(late 11) / 8 × Intel® Core™ i7-2675QM CPU @ 2.20GHz / RAM 15.5 GiB / Mesa Intel® HD Graphics 3000
  • OS : Linux(openSUSE Tumbleweed 20240716)
  • DB : ArcadeDB, Neo4j Community 5.21.2 (using Cypher / openCypher)

Posting detail

from neo4j import GraphDatabase, RoutingControl

  
  

URI = "neo4j://localhost:   "

AUTH = ("   ", "   ")

  
  

def add_friend(driver, name, friend_name):

driver.execute_query(

"MERGE (a:Character {name: $name}) "

"MERGE (friend:Character {name: $friend_name}) "

"MERGE (a)-[:Is_friend_Of]->(friend)",

name=name, friend_name=friend_name, database_="neo4j",

)

  
  

def print_friends(driver, name):

records, _, _ = driver.execute_query(

"MATCH (a:Person)-[:Is_Friend_Of]->(friend) WHERE a.name = $name "

"RETURN friend.name ORDER BY friend.name",

name=name, database_="neo4j", routing_=RoutingControl.READ,

)

for record in records:

print(record["friend.name"])

  
  

with GraphDatabase.driver(URI, auth=AUTH) as driver:

add_friend(driver, "Pooh", "Piglet")

add_friend(driver, "Pooh", "Robin")

add_friend(driver, "Robin", "Alice")

print_friends(driver, "Robin")



import logging

  

from neo4j import GraphDatabase, RoutingControl

from neo4j.exceptions import DriverError, Neo4jError

  
  

class App:

  

def __init__(self, uri, user, password, database=None):

self.driver = GraphDatabase.driver(uri, auth=(user, password))

self.database = database

  

def close(self):

self.driver.close()

  

def create_friendship(self, chracter_1, charcter_2):

with self.driver.session() as session:

result = self._create_and_return_friendship(

chracter_1, charcter_2

)

print("Created friendship between: "

f"{result['p1']}, {result['p2']}")

  

def _create_and_return_friendship(self, character_1, character_2):

  

query = (

"CREATE (c1:Character { name: $character_1 }) "

"CREATE (c2:Chracter { name: $character_2 }) "

"CREATE (c1)-[:KNOWS]->(c2) "

"RETURN c1.name, c2.name"

)

try:

record = self.driver.execute_query(

query, character_1=character_1, character_2=character_2,

database_=self.database,

result_transformer_=lambda r: r.single(strict=True)

)

return {"p1": record["c1.name"], "p2": record["c2.name"]}

except (DriverError, Neo4jError) as exception:

logging.error("%s raised an error: \n%s", query, exception)

raise

  

def find_character(self, character_name):

names = self._find_and_return_person(character_name)

for name in names:

print(f"Found character: {name}")

  

def _find_and_return_person(self, character_name):

query = (

"MATCH (p:Person) "

"WHERE p.name = $character_name "

"RETURN p.name AS name"

)

names = self.driver.execute_query(

query, character_name=character_name,

database_=self.database, routing_=RoutingControl.READ,

result_transformer_=lambda r: r.value("name")

)

return names

  

if __name__ == "__main__":

scheme = "neo4j"

host_name = "localhost"

port = 7687

uri = f"{scheme}://{host_name}:{port}"

user = "neo4j"

password = "   "

database = "   "

app = App(uri, user, password, database)

try:

app.create_friendship("Pooh", "Robin")

app.find_character("Pooh")

finally:

app.close()