Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relationships creation not working #532

Open
aissaelouafi opened this issue Mar 28, 2022 · 8 comments
Open

Relationships creation not working #532

aissaelouafi opened this issue Mar 28, 2022 · 8 comments

Comments

@aissaelouafi
Copy link

Hello,

I have an issue with the relationship creation apoc : apoc.create.relationship
I used this cypher query to create the relationship :

streams.sink.topic.cypher.relationships-topic.to.test-db2=MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

I have this data :

{
  "parent": "626a4f1b1be46810704d8055464bcbf1",
  "type_display": "Hosted by ::",
  "child": "4021d02f1b24ac10123c3035464bcb1e"
}

The idea is to match the label node based on id, but the relationships are not created even if I check that both labels exists with parent and child id.

I think that the request is not taken into account by the plugin. I can't see any message in logs regarding the query but sometimes when I restart the neo4j server I can see the created relationships.

The total number of nodes in the database is : 736670 nodes
The number of node with parent id is : 1 node
The number of node with child id is : 1 node

I added this parameters to the streams.conf file :

streams.sink.enabled=true
streams.sink.enabled.to.test-db2=true
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.security.protocol=SSL

kafka.ssl.truststore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.truststore.password=test123
kafka.ssl.keystore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.keystore.password=test123

kafka.ssl.endpoint.identification.algorithm=HTTPS
streams.check.apoc.timeout=20000
streams.check.apoc.interval=10000
streams.sink.poll.interval=5000
kafka.streams.async.commit=true

kafka.group.id=neo4j_group
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

I don't understand why the query is not executed, when I tried the request manually I can see that the relationships are well created.

If you have an idea it can be very helpful.

I think it's related to a query plan caching.
Sometimes I can see that the relationship is well created with this message on log file :

Discarded stale query from the query cache after 62549 seconds. Reason: NodesAllCardinality changed from 29300.0 to 308214.0, which is a divergence of 0.9049361807056137 which is greater than threshold 0.043763100387390126. Query: UNWIND $events AS event MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

Do you have an idea about this behavior ?

Regards,
Aissa

@mroiter-larus
Copy link
Contributor

Hi @aissaelouafi,

i think this is not a problem related to the Neo4j Streams plugin. That message means that the data involved in the query that you're trying to execute has changed significantly from the previous execution of the same query. So, the query plan in the cache may not be valid anymore and a new one will be generated.

You probably need to change some Neo4j params regarding the heap size and the page cache size.

Furthermore, you can find more details about query replanning process here.

@aissaelouafi
Copy link
Author

aissaelouafi commented Mar 31, 2022

Hi @mroiter-larus,

Thanks a lot again for your comment.
I think it's not related to the neo4j cache. I think it's related to neo4j kafka params because the node and relationships are well created when I send for example many thousands event to the kafka topic but when I send only one event it's not working and I dont see any message on log file.

I tried to change the following param : kafka.max.poll.records=1 to force kafka to poll each event, I know that this value can generate a memory / performance issues but I still can't receive the event in neo4j side. I dont know if I should modify the params neo4j.batch.size and neo4j.batch.timeout.msecs as well. I also deleted the param streams.sink.poll.interval.

This problem persist since many weeks and I really dont have any message on log file.

Many thanks for your precious comments @mroiter-larus.

Regards,
Aissa

@mroiter-larus
Copy link
Contributor

Hi @aissaelouafi,

i'll investigate on this case. In the meantime, i've noticed a thing on your streams configuration regarding the message commit. You have enabled the async commit via the kafka.streams.async.commit=true parameter, but this works only when you disable the Kafka auto commit via the kafka.enable.auto.commit=false paremeter.

Another suggestion regarding the error management. You have set the following params:

streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

but if you don't set streams.sink.errors.tolerance=all too, they don't have any effect.

Regards,

Mauro

@aissaelouafi
Copy link
Author

aissaelouafi commented Apr 1, 2022

Hello @mroiter-larus,

Thanks for you recommendations.

I just modified the parameters, the conf file looks like :

streams.sink.enabled=true
streams.sink.enabled.to.test-db2=true
streams.sink.errors.tolerance=all
streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

# Kafka conf parameters
kafka.enable.auto.commit=false
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.security.protocol=SSL

# Kafka ssl certificates
kafka.ssl.truststore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.truststore.password=test123
kafka.ssl.keystore.location=/apps/neo4j/neo4j-enterprise/conf/syslogng.jks
kafka.ssl.keystore.password=test123

# Kafka poll
kafka.max.poll.records=1

kafka.ssl.endpoint.identification.algorithm=HTTPS
streams.check.apoc.timeout=2000
streams.check.apoc.interval=1000

kafka.group.id=neo4j_group
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

I have more and more relationships created with this configuration but I still don't receive all events I send to the kafka topic. I'm subscribed to 4 kafka topics as following :

streams.sink.topic.pattern.node.topic-1.to.testdb-2=Incident{!sys_id}
streams.sink.topic.pattern.node.topic-2.to.testdb-2=Change{!sys_id}
streams.sink.topic.cypher.topic-3.to.testdb-2=call apoc.do.case(...)
streams.sink.topic.cypher.topic-4.to.testdb-2=MATCH (child),(parent) WHERE child.id=event.child AND parent.id=event.parent CALL apoc.create.relationship(parent, event.type_display, {}, child) YIELD rel RETURN rel

I suspect a delay related to kafka consumer but that's why I set the param kafka.max.poll.records=1 and I removed the param streams.sink.poll.interval.

I have another question, It is possible to consume the same kafka topic 2 times, I have the following data :

{
'id':1,
'name':'Luka',
'friend_of':2
}

I want to create the node Person for example and also the relationship friend_od with the Person with id=2.

Regards,
Aissa

@aissaelouafi
Copy link
Author

aissaelouafi commented Apr 5, 2022

Hi @mroiter-larus,

Did you have time to investigate this case please ?
About the second part of the issue, I dont know if there a solution to execute 2 cypher query based on the same event consumed from kafka. Like create a Node and a Relationship based on the same kafka topic.

I want to do something like that :

streams.sink.topic.cypher.topic-1.to.testdb-2=MERGE (n:Incident {id: event.id}) ON CREATE SET n +=event.properties MATCH (c {id: event.cmdb_ci}) CREATE (n)-[r:IMPACTS]->(c)

Thanks a lot for your precious help.

Regards,
Aissa

@mroiter-larus
Copy link
Contributor

@aissaelouafi

i was able to replicate the issue. I'm still investigating.
About the second part of the issue, you can't consume events from a topic two times with the same consumer. You should force the creation of a new consumer but this is not possible with the plugin.

I'll keep you posted.

Mauro

@aissaelouafi
Copy link
Author

aissaelouafi commented Apr 7, 2022

Hi @mroiter-larus,

I just get this error message in log file :

2022-04-07 14:15:14.235+0000 ERROR [s.StreamsSinkConfigurationListener] [db-2/79d0ad63] Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1151) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1081) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:937) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1473) ~[neo4j-streams-4.1.0.jar:?]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1431) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaManualCommitEventConsumer.commitData(KafkaManualCommitEventConsumer.kt:84) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaManualCommitEventConsumer.read(KafkaManualCommitEventConsumer.kt:91) ~[neo4j-streams-4.1.0.jar:?]
        at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:159) [neo4j-streams-4.1.0.jar:?]
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678) [neo4j-streams-4.1.0.jar:?]
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665) [neo4j-streams-4.1.0.jar:?]

I think that the issue is related to these parameters.

Regards,
Aissa

@aissaelouafi
Copy link
Author

aissaelouafi commented Apr 13, 2022

Hi @mroiter-larus,

I hope that you are doing well.
I have a question concerning the issue as we can't have two consumers for the same kafka topic to create both node and relationship. I tried to create a cypher query to create the node and the relationship at the same time but it's not working.

I have this data :

{
id : 1,
name: Test,
friend_of : 2
}

So I want to create a node called Person with attribute {id:1, name: 'Test'} and the relationship friend_of with the node Person with id=2. This is the cypher query I tried :

MERGE (n:Person {id: event.id}) ON CREATE SET n +=event.properties WITH n MATCH (p) WHERE p.id=event.friend_of CALL apoc.create.relationship (n, 'friend_of', {}, p) YIELD rel RETURN rel

I don't know if this kind of cypher query is supporter by the kafka plugin or maybe if I have a syntax error on the query because both node and relationships are not created with this query.

Do you have some news concerning the kafka delay to consume event ?

Thanks a lot for your help.

Regards,
Aissa

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants