001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.node.ObjectNode;
023import java.io.IOException;
024import java.nio.file.Path;
025import java.time.Duration;
026import java.time.Instant;
027import java.util.LinkedList;
028import java.util.Queue;
029import java.util.logging.Level;
030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName;
031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
040import org.jgrapes.core.Channel;
041import org.jgrapes.core.Components;
042import org.jgrapes.core.Components.Timer;
043import org.jgrapes.core.annotation.Handler;
044import org.jgrapes.core.events.Stop;
045import org.jgrapes.io.events.Closed;
046import org.jgrapes.io.events.ProcessExited;
047import org.jgrapes.net.SocketIOChannel;
048import org.jgrapes.util.events.ConfigurationUpdate;
049
050/**
051 * A component that handles the communication over the Qemu monitor
052 * socket.
053 * 
054 * If the log level for this class is set to fine, the messages 
055 * exchanged on the monitor socket are logged.
056 */
057public class QemuMonitor extends QemuConnector {
058
059    private int powerdownTimeout;
060    private final Queue<QmpCommand> executing = new LinkedList<>();
061    private Instant powerdownStartedAt;
062    private Stop suspendedStop;
063    private Timer powerdownTimer;
064    private boolean powerdownConfirmed;
065    private boolean monitorReady;
066
067    /**
068     * Instantiates a new QEMU monitor.
069     *
070     * @param componentChannel the component channel
071     * @param configDir the config dir
072     * @throws IOException Signals that an I/O exception has occurred.
073     */
074    public QemuMonitor(Channel componentChannel, Path configDir)
075            throws IOException {
076        super(componentChannel);
077        attach(new RamController(channel()));
078        attach(new CpuController(channel()));
079        attach(new DisplayController(channel(), configDir));
080        attach(new CdMediaController(channel()));
081    }
082
083    /**
084     * As the initial configuration of this component depends on the 
085     * configuration of the {@link Runner}, it doesn't have a handler 
086     * for the {@link ConfigurationUpdate} event. The values are 
087     * forwarded from the {@link Runner} instead.
088     *
089     * @param socketPath the socket path
090     * @param powerdownTimeout 
091     */
092    /* default */ void configure(Path socketPath, int powerdownTimeout) {
093        super.configure(socketPath);
094        this.powerdownTimeout = powerdownTimeout;
095    }
096
097    /**
098     * When the socket is connected, send the capabilities command.
099     */
100    @Override
101    protected void socketConnected() {
102        rep().fire(new MonitorCommand(new QmpCapabilities()));
103    }
104
105    @Override
106    protected void processInput(String line)
107            throws IOException {
108        logger.finer(() -> "monitor(in): " + line);
109        try {
110            var response = mapper.readValue(line, ObjectNode.class);
111            if (response.has("QMP")) {
112                monitorReady = true;
113                logger.fine(() -> "QMP connection ready");
114                rep().fire(new MonitorReady());
115                return;
116            }
117            if (response.has("return") || response.has("error")) {
118                QmpCommand executed = executing.poll();
119                logger.finer(
120                    () -> String.format("(Previous \"monitor(in)\" is result "
121                        + "from executing %s)", executed));
122                var monRes = MonitorResult.from(executed, response);
123                logger.fine(() -> "QMP triggers: " + monRes);
124                rep().fire(monRes);
125                return;
126            }
127            if (response.has("event")) {
128                MonitorEvent.from(response).ifPresent(me -> {
129                    logger.fine(() -> "QMP triggers: " + me);
130                    rep().fire(me);
131                });
132            }
133        } catch (JsonProcessingException e) {
134            throw new IOException(e);
135        }
136    }
137
138    /**
139     * On closed.
140     *
141     * @param event the event
142     */
143    @Handler
144    public void onClosed(Closed<?> event, SocketIOChannel channel) {
145        channel.associated(this, getClass()).ifPresent(qm -> {
146            super.onClosed(event, channel);
147            logger.fine(() -> "QMP connection closed.");
148            monitorReady = false;
149        });
150    }
151
152    /**
153     * On monitor command.
154     *
155     * @param event the event
156     * @throws IOException 
157     */
158    @Handler
159    @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
160        "PMD.AvoidDuplicateLiterals" })
161    public void onMonitorCommand(MonitorCommand event) throws IOException {
162        // Check prerequisites
163        if (!monitorReady && !(event.command() instanceof QmpCapabilities)) {
164            logger.severe(() -> "Premature QMP command (not ready): "
165                + event.command());
166            rep().fire(new Stop());
167            return;
168        }
169
170        // Send the command
171        var command = event.command();
172        logger.fine(() -> "QMP handles: " + event.toString());
173        String asText;
174        try {
175            asText = command.asText();
176            logger.finer(() -> "monitor(out): " + asText);
177        } catch (JsonProcessingException e) {
178            logger.log(Level.SEVERE, e,
179                () -> "Cannot serialize Json: " + e.getMessage());
180            return;
181        }
182        synchronized (executing) {
183            if (writer().isPresent()) {
184                executing.add(command);
185                sendCommand(asText);
186            }
187        }
188    }
189
190    /**
191     * Shutdown the VM.
192     *
193     * @param event the event
194     */
195    @Handler(priority = 100)
196    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
197    public void onStop(Stop event) {
198        if (!monitorReady) {
199            logger.fine(() -> "Not sending QMP powerdown command"
200                + " because QMP connection is closed");
201            return;
202        }
203
204        // We have a connection to Qemu, attempt ACPI shutdown if time left
205        powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> {
206            var now = Instant.now();
207            event.setAssociated(Instant.class, now);
208            return now;
209        });
210        if (powerdownStartedAt.plusSeconds(powerdownTimeout)
211            .isBefore(Instant.now())) {
212            return;
213        }
214        event.suspendHandling();
215        suspendedStop = event;
216
217        // Send command. If not confirmed, assume "hanging" qemu process.
218        powerdownTimer = Components.schedule(t -> {
219            logger.fine(() -> "QMP powerdown command not confirmed");
220            synchronized (this) {
221                powerdownTimer = null;
222                if (suspendedStop != null) {
223                    suspendedStop.resumeHandling();
224                    suspendedStop = null;
225                }
226            }
227        }, Duration.ofSeconds(5));
228        logger.fine(() -> "Attempting QMP (ACPI) powerdown.");
229        rep().fire(new MonitorCommand(new QmpPowerdown()));
230    }
231
232    /**
233     * When the powerdown event is confirmed, wait for termination
234     * or timeout. Termination is detected by the qemu process exiting
235     * (see {@link #onProcessExited(ProcessExited)}).
236     *
237     * @param event the event
238     */
239    @Handler
240    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
241    public void onPowerdownEvent(PowerdownEvent event) {
242        synchronized (this) {
243            // Cancel confirmation timeout
244            if (powerdownTimer != null) {
245                powerdownTimer.cancel();
246            }
247
248            // (Re-)schedule timer as fallback
249            var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout);
250            logger.fine(() -> "QMP powerdown confirmed, waiting for"
251                + " termination until " + waitUntil);
252            powerdownTimer = Components.schedule(t -> {
253                logger.fine(() -> "Powerdown timeout reached.");
254                synchronized (this) {
255                    powerdownTimer = null;
256                    if (suspendedStop != null) {
257                        suspendedStop.resumeHandling();
258                        suspendedStop = null;
259                    }
260                }
261            }, waitUntil);
262            powerdownConfirmed = true;
263        }
264    }
265
266    /**
267     * On process exited.
268     *
269     * @param event the event
270     */
271    @Handler
272    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
273    public void onProcessExited(ProcessExited event) {
274        if (!event.startedBy().associated(CommandDefinition.class)
275            .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) {
276            return;
277        }
278        synchronized (this) {
279            if (powerdownTimer != null) {
280                powerdownTimer.cancel();
281            }
282            if (suspendedStop != null) {
283                suspendedStop.resumeHandling();
284                suspendedStop = null;
285            }
286        }
287    }
288
289    /**
290     * On configure qemu.
291     *
292     * @param event the event
293     */
294    @Handler
295    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
296    public void onConfigureQemu(ConfigureQemu event) {
297        int newTimeout = event.configuration().vm.powerdownTimeout;
298        if (powerdownTimeout != newTimeout) {
299            powerdownTimeout = newTimeout;
300            synchronized (this) {
301                if (powerdownTimer != null && powerdownConfirmed) {
302                    powerdownTimer
303                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
304                }
305
306            }
307        }
308    }
309
310}