001/*
002 * VM-Operator
003 * Copyright (C) 2025 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.databind.ObjectMapper;
022import java.io.IOException;
023import java.io.Writer;
024import java.lang.reflect.UndeclaredThrowableException;
025import java.net.UnixDomainSocketAddress;
026import java.nio.file.Files;
027import java.nio.file.Path;
028import java.util.Optional;
029import org.jgrapes.core.Channel;
030import org.jgrapes.core.Component;
031import org.jgrapes.core.EventPipeline;
032import org.jgrapes.core.annotation.Handler;
033import org.jgrapes.core.events.Start;
034import org.jgrapes.core.events.Stop;
035import org.jgrapes.io.events.Closed;
036import org.jgrapes.io.events.ConnectError;
037import org.jgrapes.io.events.Input;
038import org.jgrapes.io.events.OpenSocketConnection;
039import org.jgrapes.io.util.ByteBufferWriter;
040import org.jgrapes.io.util.LineCollector;
041import org.jgrapes.net.SocketIOChannel;
042import org.jgrapes.net.events.ClientConnected;
043import org.jgrapes.util.events.ConfigurationUpdate;
044import org.jgrapes.util.events.FileChanged;
045import org.jgrapes.util.events.WatchFile;
046
047/**
048 * A component that handles the communication with QEMU over a socket.
049 * 
050 * Derived classes should log the messages exchanged on the socket
051 * if the log level is set to fine.
052 */
053public abstract class QemuConnector extends Component {
054
055    @SuppressWarnings("PMD.FieldNamingConventions")
056    protected static final ObjectMapper mapper = new ObjectMapper();
057
058    private EventPipeline rep;
059    private Path socketPath;
060    private SocketIOChannel qemuChannel;
061
062    /**
063     * Instantiates a new QEMU connector.
064     *
065     * @param componentChannel the component channel
066     * @throws IOException Signals that an I/O exception has occurred.
067     */
068    public QemuConnector(Channel componentChannel) throws IOException {
069        super(componentChannel);
070    }
071
072    /**
073     * As the initial configuration of this component depends on the 
074     * configuration of the {@link Runner}, it doesn't have a handler 
075     * for the {@link ConfigurationUpdate} event. The values are 
076     * forwarded from the {@link Runner} instead.
077     *
078     * @param socketPath the socket path
079     */
080    /* default */ void configure(Path socketPath) {
081        this.socketPath = socketPath;
082        logger.fine(() -> getClass().getSimpleName()
083            + " configured with socketPath=" + socketPath);
084    }
085
086    /**
087     * Note the runner's event processor and delete the socket.
088     *
089     * @param event the event
090     * @throws IOException Signals that an I/O exception has occurred.
091     */
092    @Handler
093    public void onStart(Start event) throws IOException {
094        rep = event.associated(EventPipeline.class).get();
095        if (socketPath == null) {
096            return;
097        }
098        Files.deleteIfExists(socketPath);
099        fire(new WatchFile(socketPath));
100    }
101
102    /**
103     * Return the runner's event pipeline.
104     *
105     * @return the event pipeline
106     */
107    protected EventPipeline rep() {
108        return rep;
109    }
110
111    /**
112     * Watch for the creation of the swtpm socket and start the
113     * qemu process if it has been created.
114     *
115     * @param event the event
116     */
117    @Handler
118    public void onFileChanged(FileChanged event) {
119        if (event.change() == FileChanged.Kind.CREATED
120            && event.path().equals(socketPath)) {
121            // qemu running, open socket
122            fire(new OpenSocketConnection(
123                UnixDomainSocketAddress.of(socketPath))
124                    .setAssociated(getClass(), this));
125        }
126    }
127
128    /**
129     * Check if this is from opening the agent socket and if true,
130     * save the socket in the context and associate the channel with
131     * the context.
132     *
133     * @param event the event
134     * @param channel the channel
135     */
136    @SuppressWarnings("resource")
137    @Handler
138    public void onClientConnected(ClientConnected event,
139            SocketIOChannel channel) {
140        event.openEvent().associated(getClass()).ifPresent(qm -> {
141            qemuChannel = channel;
142            channel.setAssociated(getClass(), this);
143            channel.setAssociated(Writer.class, new ByteBufferWriter(
144                channel).nativeCharset());
145            channel.setAssociated(LineCollector.class,
146                new LineCollector()
147                    .consumer(line -> {
148                        try {
149                            processInput(line);
150                        } catch (IOException e) {
151                            throw new UndeclaredThrowableException(e);
152                        }
153                    }));
154            socketConnected();
155        });
156    }
157
158    /**
159     * Return the QEMU channel if the connection has been established.
160     *
161     * @return the socket IO channel
162     */
163    protected Optional<SocketIOChannel> qemuChannel() {
164        return Optional.ofNullable(qemuChannel);
165    }
166
167    /**
168     * Return the {@link Writer} for the connection if the connection
169     * has been established.
170     *
171     * @return the optional
172     */
173    protected Optional<Writer> writer() {
174        return qemuChannel().flatMap(c -> c.associated(Writer.class));
175    }
176
177    /**
178     * Send the given command to QEMU. A newline is appended to the
179     * command automatically.
180     *
181     * @param command the command
182     * @return true, if successful
183     * @throws IOException Signals that an I/O exception has occurred.
184     */
185    protected boolean sendCommand(String command) throws IOException {
186        if (writer().isEmpty()) {
187            return false;
188        }
189        writer().get().append(command).append('\n').flush();
190        return true;
191    }
192
193    /**
194     * Called when the connector has been connected to the socket.
195     */
196    @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
197    protected void socketConnected() {
198        // Default is to do nothing.
199    }
200
201    /**
202     * Called when a connection attempt fails.
203     *
204     * @param event the event
205     * @param channel the channel
206     */
207    @Handler
208    public void onConnectError(ConnectError event, SocketIOChannel channel) {
209        event.event().associated(getClass()).ifPresent(qm -> {
210            rep.fire(new Stop());
211        });
212    }
213
214    /**
215     * Handle data from the socket connection.
216     *
217     * @param event the event
218     * @param channel the channel
219     */
220    @Handler
221    public void onInput(Input<?> event, SocketIOChannel channel) {
222        if (channel.associated(getClass()).isEmpty()) {
223            return;
224        }
225        channel.associated(LineCollector.class).ifPresent(collector -> {
226            collector.feed(event);
227        });
228    }
229
230    /**
231     * Process agent input.
232     *
233     * @param line the line
234     * @throws IOException Signals that an I/O exception has occurred.
235     */
236    protected abstract void processInput(String line) throws IOException;
237
238    /**
239     * On closed.
240     *
241     * @param event the event
242     * @param channel the channel
243     */
244    @Handler
245    public void onClosed(Closed<?> event, SocketIOChannel channel) {
246        channel.associated(getClass()).ifPresent(qm -> {
247            qemuChannel = null;
248        });
249    }
250}