records, summary, keys = driver.execute_query(""" MATCH (p:Person)-[:KNOWS]->(:Person) RETURN p.name AS name """, database_="<database-name>")
# Loop through results and do something with themfor record in records:
print(record.data()) # get record as dict# Summary informationprint("The query `{query}` returned {records_count} records in {time} ms.".format(
query=summary.query,
records_count=len(records),
time=summary.result_available_after
))
summary 为服务器返回的执行摘要。
更新数据 (MATCH/SET)
records, summary, keys = driver.execute_query(""" MATCH (p:Person {name: $name}) SET p.age = $age """, name="Alice", age=42, database_="<database-name>")
print(f"Query counters: {summary.counters}.")
# This does not delete _only_ p, but also all its relationships!
records, summary, keys = driver.execute_query(""" MATCH (p:Person {name: $name}) DETACH DELETE p """, name="Alice", database_="<database-name>")
print(f"Query counters: {summary.counters}.")
# from neo4j.exceptions import Neo4jErrortry:
driver.execute_query('MATCH (p:Person) RETURN', database_='<database-name>')
except Neo4jError as e:
if e.find_by_gql_status('42001'):# Neo.ClientError.Statement.SyntaxError# special handling of syntax error in queryprint(e.message)
elif e.find_by_gql_status('42NFF'):# Neo.ClientError.Security.Forbidden# special handling of user not having CREATE permissionsprint(e.message)
else:# handling of all other exceptionsprint(e.message)
import neo4j
import pandas
pandas_df = driver.execute_query("UNWIND range(1, 10) AS n RETURN n, n+1 AS m", database_="<database-name>", result_transformer_=neo4j.Result.to_df)
print(type(pandas_df))# <class 'pandas.core.frame.DataFrame'>
自定义返回结果
# Get a single record (or an exception) and the summary from a result.defget_single_person(result):
record = result.single(strict=True)
summary = result.consume()
return record, summary
record, summary = driver.execute_query("MERGE (a:Person {name: $name}) RETURN a.name AS name", name="Alice", database_="<database-name>", result_transformer_=get_single_person,)
print("The query `{query}` returned {record} in {time} ms.".format(
query=summary.query,
record=record,
time=summary.result_available_after
))
# Get exactly 5 records, or an exception.defexactly_5(result):
records = result.fetch(5)
iflen(records) != 5:
raise Exception(f"Expected exactly 5 records, found only {len(records)}.")
if result.peek():
raise Exception("Expected exactly 5 records, found more.")
return records
records = driver.execute_query(""" UNWIND ['Alice', 'Bob', 'Laura', 'John', 'Patricia'] AS name MERGE (a:Person {name: name}) RETURN a.name AS name """, database_="<database-name>", result_transformer_=exactly_5,)
# 事务函数回调负责运行查询defmatch_person_nodes(tx, name_filter):
""" @param tx: session 对象 @param name_filter: 查询过滤 @return: List, 这里不应当返回 result 对象,这样会导致事务提交 """# 使用此方法 Transaction.run() 运行查询。每次查询运行都会返回一个 Result 对象
result = tx.run(""" MATCH (p:Person) WHERE p.name STARTS WITH $filter RETURN p.name AS name ORDER BY name """, filter=name_filter)
# 使用以下任何方法处理结果 Result。returnlist(result) # a list of Record objects# 创建会话。一个会话可以容纳多个查询。除非使用 with 构造函数创建,否则请记住在完成后关闭会话。with driver.session(database="<database-name>") as session:
# .execute_read()(or ) 方法.execute_write() 是事务的入口点。它接受一个事务函数的回调函数以及任意数量的位置参数和关键字参数,这些参数将传递给事务函数。
people = session.execute_read(match_person_nodes, "Al")
for person in people:
print(person.data()) # obtain dict representation
一个包含附加到交易的元数据的字典。这些元数据会被记录在服务器中 query.log,并显示在 Cypher 命令的输出中 SHOW TRANSACTIONS。使用此功能可以标记交易
from neo4j import unit_of_work
@unit_of_work(timeout=5, metadata={"app_name":"people_tracker"})defcount_people(tx):
result = tx.run("MATCH (a:Person) RETURN count(a) AS people")
record = result.single()
return record["people"]
with driver.session(database="<database-name>") as session:
people_n = session.execute_read(count_people)
with driver.session(database="<database-name>") as session:
with session.begin_transaction() as tx:
# use tx.run() to run queries and tx.commit() when done
tx.run("<QUERY 1>")
tx.run("<QUERY 2>")
tx.commit()
from neo4j import GraphDatabase
URI = "<database-uri>"
AUTH = ("<username>", "<password>")
employee_threshold = 10defmain():
with GraphDatabase.driver(URI, auth=AUTH) as driver:
with driver.session(database="<database-name>") as session:
for i inrange(100):
name = f"Thor{i}"
org_id = session.execute_write(employ_person_tx, name)
print(f"User {name} added to organization {org_id}")
defemploy_person_tx(tx, name):
# Create new Person node with given name, if not exists already
result = tx.run(""" MERGE (p:Person {name: $name}) RETURN p.name AS name """, name=name)
# Obtain most recent organization ID and the number of people linked to it
result = tx.run(""" MATCH (o:Organization) RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employees_n ORDER BY o.created_date DESC LIMIT 1 """)
org = result.single()
if org isnotNoneand org["employees_n"] == 0:
raise Exception("Most recent organization is empty.")
# Transaction will roll back -> not even Person is created!# If org does not have too many employees, add this Person to thatif org isnotNoneand org.get("employees_n") < employee_threshold:
result = tx.run(""" MATCH (o:Organization {id: $org_id}) MATCH (p:Person {name: $name}) MERGE (p)-[r:WORKS_FOR]->(o) RETURN $org_id AS id """, org_id=org["id"], name=name)
# Otherwise, create a new Organization and link Person to itelse:
result = tx.run(""" MATCH (p:Person {name: $name}) CREATE (o:Organization {id: randomuuid(), created_date: datetime()}) MERGE (p)-[r:WORKS_FOR]->(o) RETURN o.id AS id """, name=name)
# Return the Organization ID to which the new Person ends up inreturn result.single()["id"]
if __name__ == "__main__":
main()
自定义事务提交
import neo4j
URI = "<database-uri>"
AUTH = ("<username>", "<password>")
defmain():
with neo4j.GraphDatabase.driver(URI, auth=AUTH) as driver:
customer_id = create_customer(driver)
other_bank_id = 42
transfer_to_other_bank(driver, customer_id, other_bank_id, 999)
defcreate_customer(driver):
result, _, _ = driver.execute_query(""" MERGE (c:Customer {id: rand()}) RETURN c.id AS id """, database_="<database-name>")
return result[0]["id"]
deftransfer_to_other_bank(driver, customer_id, other_bank_id, amount):
with driver.session(database="<database-name>") as session:
with session.begin_transaction() as tx:
ifnot customer_balance_check(tx, customer_id, amount):
# give upreturn other_bank_transfer_api(customer_id, other_bank_id, amount)
# Now the money has been transferred => can't rollback anymore# (cannot rollback external services interactions)try:
decrease_customer_balance(tx, customer_id, amount)
tx.commit()
except Exception as e:
request_inspection(customer_id, other_bank_id, amount, e)
raise# roll backdefcustomer_balance_check(tx, customer_id, amount):
query = (""" MATCH (c:Customer {id: $id}) RETURN c.balance >= $amount AS sufficient """)
result = tx.run(query, id=customer_id, amount=amount)
record = result.single(strict=True)
return record["sufficient"]
defother_bank_transfer_api(customer_id, other_bank_id, amount):
# make some API call to other bankpassdefdecrease_customer_balance(tx, customer_id, amount):
query = (""" MATCH (c:Customer {id: $id}) SET c.balance = c.balance - $amount """)
result = tx.run(query, id=customer_id, amount=amount)
result.consume()
defrequest_inspection(customer_id, other_bank_id, amount, e):
# manual cleanup required; log this or similarprint("WARNING: transaction rolled back due to exception:", repr(e))
print("customer_id:", customer_id, "other_bank_id:", other_bank_id, "amount:", amount)
if __name__ == "__main__":
main()
defcreate_people(tx):
result = tx.run(""" UNWIND ["Alice", "Bob"] AS name MERGE (p:Person {name: name}) """)
return result.consume()
with driver.session(database="<database-name>") as session:
result_summary = session.execute_write(create_people)