Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Illegal seek for FileIO.fromPath with named pipe #32134

Open
johanandren opened this issue Sep 27, 2023 · 4 comments
Open

Illegal seek for FileIO.fromPath with named pipe #32134

johanandren opened this issue Sep 27, 2023 · 4 comments
Labels

Comments

@johanandren
Copy link
Member

johanandren commented Sep 27, 2023

It seems to be not working for me (tried with mulitple versions):

java.io.IOException: Illegal seek
        at java.base/sun.nio.ch.FileDispatcherImpl.seek0(Native Method)
        at java.base/sun.nio.ch.FileDispatcherImpl.seek(FileDispatcherImpl.java:78)
        at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:361)
        at akka.stream.impl.io.FileSource$$anon$1.preStart(IOSources.scala:80)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:295)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:557)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:679)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:727)
        at akka.actor.Actor.aroundPreStart(Actor.scala:528)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:528)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:670)
        at akka.actor.ActorCell.create(ActorCell.scala:652)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)```

Originally posted by @syspulse in #25328 (comment)

@johanandren
Copy link
Member Author

If you can share a reproducer or clear steps to reproduce that would be useful @syspulse

@syspulse
Copy link

syspulse commented Sep 27, 2023

# uname -a
Linux  5.15.0-79-generic #86~20.04.2-Ubuntu SMP Mon Jul 17 23:27:17 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux
# mkfifo /tmp/pipe

ammonite (Ammonite Repl 2.5.4 (Scala 2.13.8 Java 11.0.5)):

import $ivy.`com.typesafe.akka::akka-stream:2.6.19`
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file.{Paths, Files}
import akka.actor.ActorSystem
implicit val as = ActorSystem("Pipe")
implicit val ec = scala.concurrent.ExecutionContext.global
val f = FileIO.fromPath(Paths.get("/tmp/pipe")).to(Sink.foreach{println}).run()

Push data:

echo "1" >/tmp/pipe
[ERROR] [09/27/2023 11:51:00.173] [Pipe-akka.actor.default-blocking-io-dispatcher-8] [akka://Pipe/system/Materializers/StreamSupervisor-0/flow-0-1-fileSource] Error during preStart in [akka.stream.impl.io.FileSource$$anon$2-fileSource]: Nicht erlaubter Seek
java.io.IOException: Nicht erlaubter Seek
        at java.base/sun.nio.ch.FileDispatcherImpl.seek0(Native Method)
        at java.base/sun.nio.ch.FileDispatcherImpl.seek(FileDispatcherImpl.java:78)
        at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:361)
        at akka.stream.impl.io.FileSource$$anon$2.preStart(IOSources.scala:74)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:619)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:727)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:776)
        at akka.actor.Actor.aroundPreStart(Actor.scala:548)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:548)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:716)
        at akka.actor.ActorCell.create(ActorCell.scala:644)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:514)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:536)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:295)
        at akka.dispatch.Mailbox.run(Mailbox.scala:230)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

@johanandren
Copy link
Member Author

In the FileSource we always read with a position, but that is not supported at all with fifos, additionally, opening a fifo with FileChannel.open will block the thread until there is a write on the other side.

I think we should detect fifos passed to it and fail early saying it is not supported. To stream from a fifo there would need to be a special stream source, but this is the first time I have seen anyone talking about it so it is unlikely to be something we add in core akka streams.

@johanandren
Copy link
Member Author

For the record a workaround would be to consume the fifo with a FileInputStream and adapt to stream using StreamConverters.fromInputStream (which expects and handles that the interactions are blocking)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants