001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.node.ObjectNode;
023import java.io.IOException;
024import java.nio.file.Path;
025import java.time.Duration;
026import java.time.Instant;
027import java.util.LinkedList;
028import java.util.Queue;
029import java.util.logging.Level;
030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName;
031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
040import org.jgrapes.core.Channel;
041import org.jgrapes.core.Components;
042import org.jgrapes.core.Components.Timer;
043import org.jgrapes.core.annotation.Handler;
044import org.jgrapes.core.events.Stop;
045import org.jgrapes.io.events.Closed;
046import org.jgrapes.io.events.ProcessExited;
047import org.jgrapes.net.SocketIOChannel;
048import org.jgrapes.util.events.ConfigurationUpdate;
049
050/**
051 * A component that handles the communication over the Qemu monitor
052 * socket.
053 * 
054 * If the log level for this class is set to fine, the messages 
055 * exchanged on the monitor socket are logged.
056 */
057@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
058public class QemuMonitor extends QemuConnector {
059
060    private int powerdownTimeout;
061    private final Queue<QmpCommand> executing = new LinkedList<>();
062    private Instant powerdownStartedAt;
063    private Stop suspendedStop;
064    private Timer powerdownTimer;
065    private boolean powerdownConfirmed;
066    private boolean monitorReady;
067
068    /**
069     * Instantiates a new QEMU monitor.
070     *
071     * @param componentChannel the component channel
072     * @param configDir the config dir
073     * @throws IOException Signals that an I/O exception has occurred.
074     */
075    @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
076        "PMD.ConstructorCallsOverridableMethod" })
077    public QemuMonitor(Channel componentChannel, Path configDir)
078            throws IOException {
079        super(componentChannel);
080        attach(new RamController(channel()));
081        attach(new CpuController(channel()));
082        attach(new DisplayController(channel(), configDir));
083        attach(new CdMediaController(channel()));
084    }
085
086    /**
087     * As the initial configuration of this component depends on the 
088     * configuration of the {@link Runner}, it doesn't have a handler 
089     * for the {@link ConfigurationUpdate} event. The values are 
090     * forwarded from the {@link Runner} instead.
091     *
092     * @param socketPath the socket path
093     * @param powerdownTimeout 
094     */
095    /* default */ void configure(Path socketPath, int powerdownTimeout) {
096        super.configure(socketPath);
097        this.powerdownTimeout = powerdownTimeout;
098    }
099
100    /**
101     * When the socket is connected, send the capabilities command.
102     */
103    @Override
104    protected void socketConnected() {
105        rep().fire(new MonitorCommand(new QmpCapabilities()));
106    }
107
108    @Override
109    protected void processInput(String line)
110            throws IOException {
111        logger.finer(() -> "monitor(in): " + line);
112        try {
113            var response = mapper.readValue(line, ObjectNode.class);
114            if (response.has("QMP")) {
115                monitorReady = true;
116                logger.fine(() -> "QMP connection ready");
117                rep().fire(new MonitorReady());
118                return;
119            }
120            if (response.has("return") || response.has("error")) {
121                QmpCommand executed = executing.poll();
122                logger.finer(
123                    () -> String.format("(Previous \"monitor(in)\" is result "
124                        + "from executing %s)", executed));
125                var monRes = MonitorResult.from(executed, response);
126                logger.fine(() -> "QMP triggers: " + monRes);
127                rep().fire(monRes);
128                return;
129            }
130            if (response.has("event")) {
131                MonitorEvent.from(response).ifPresent(me -> {
132                    logger.fine(() -> "QMP triggers: " + me);
133                    rep().fire(me);
134                });
135            }
136        } catch (JsonProcessingException e) {
137            throw new IOException(e);
138        }
139    }
140
141    /**
142     * On closed.
143     *
144     * @param event the event
145     */
146    @Handler
147    public void onClosed(Closed<?> event, SocketIOChannel channel) {
148        channel.associated(this, getClass()).ifPresent(qm -> {
149            super.onClosed(event, channel);
150            logger.fine(() -> "QMP connection closed.");
151            monitorReady = false;
152        });
153    }
154
155    /**
156     * On monitor command.
157     *
158     * @param event the event
159     * @throws IOException 
160     */
161    @Handler
162    @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
163        "PMD.AvoidDuplicateLiterals" })
164    public void onMonitorCommand(MonitorCommand event) throws IOException {
165        // Check prerequisites
166        if (!monitorReady && !(event.command() instanceof QmpCapabilities)) {
167            logger.severe(() -> "Premature QMP command (not ready): "
168                + event.command());
169            rep().fire(new Stop());
170            return;
171        }
172
173        // Send the command
174        var command = event.command();
175        logger.fine(() -> "QMP handles: " + event.toString());
176        String asText;
177        try {
178            asText = command.asText();
179            logger.finer(() -> "monitor(out): " + asText);
180        } catch (JsonProcessingException e) {
181            logger.log(Level.SEVERE, e,
182                () -> "Cannot serialize Json: " + e.getMessage());
183            return;
184        }
185        synchronized (executing) {
186            if (writer().isPresent()) {
187                executing.add(command);
188                sendCommand(asText);
189            }
190        }
191    }
192
193    /**
194     * Shutdown the VM.
195     *
196     * @param event the event
197     */
198    @Handler(priority = 100)
199    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
200    public void onStop(Stop event) {
201        if (!monitorReady) {
202            logger.fine(() -> "Not sending QMP powerdown command"
203                + " because QMP connection is closed");
204            return;
205        }
206
207        // We have a connection to Qemu, attempt ACPI shutdown if time left
208        powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> {
209            var now = Instant.now();
210            event.setAssociated(Instant.class, now);
211            return now;
212        });
213        if (powerdownStartedAt.plusSeconds(powerdownTimeout)
214            .isBefore(Instant.now())) {
215            return;
216        }
217        event.suspendHandling();
218        suspendedStop = event;
219
220        // Send command. If not confirmed, assume "hanging" qemu process.
221        powerdownTimer = Components.schedule(t -> {
222            logger.fine(() -> "QMP powerdown command not confirmed");
223            synchronized (this) {
224                powerdownTimer = null;
225                if (suspendedStop != null) {
226                    suspendedStop.resumeHandling();
227                    suspendedStop = null;
228                }
229            }
230        }, Duration.ofSeconds(5));
231        logger.fine(() -> "Attempting QMP (ACPI) powerdown.");
232        rep().fire(new MonitorCommand(new QmpPowerdown()));
233    }
234
235    /**
236     * When the powerdown event is confirmed, wait for termination
237     * or timeout. Termination is detected by the qemu process exiting
238     * (see {@link #onProcessExited(ProcessExited)}).
239     *
240     * @param event the event
241     */
242    @Handler
243    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
244    public void onPowerdownEvent(PowerdownEvent event) {
245        synchronized (this) {
246            // Cancel confirmation timeout
247            if (powerdownTimer != null) {
248                powerdownTimer.cancel();
249            }
250
251            // (Re-)schedule timer as fallback
252            var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout);
253            logger.fine(() -> "QMP powerdown confirmed, waiting for"
254                + " termination until " + waitUntil);
255            powerdownTimer = Components.schedule(t -> {
256                logger.fine(() -> "Powerdown timeout reached.");
257                synchronized (this) {
258                    powerdownTimer = null;
259                    if (suspendedStop != null) {
260                        suspendedStop.resumeHandling();
261                        suspendedStop = null;
262                    }
263                }
264            }, waitUntil);
265            powerdownConfirmed = true;
266        }
267    }
268
269    /**
270     * On process exited.
271     *
272     * @param event the event
273     */
274    @Handler
275    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
276    public void onProcessExited(ProcessExited event) {
277        if (!event.startedBy().associated(CommandDefinition.class)
278            .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) {
279            return;
280        }
281        synchronized (this) {
282            if (powerdownTimer != null) {
283                powerdownTimer.cancel();
284            }
285            if (suspendedStop != null) {
286                suspendedStop.resumeHandling();
287                suspendedStop = null;
288            }
289        }
290    }
291
292    /**
293     * On configure qemu.
294     *
295     * @param event the event
296     */
297    @Handler
298    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
299    public void onConfigureQemu(ConfigureQemu event) {
300        int newTimeout = event.configuration().vm.powerdownTimeout;
301        if (powerdownTimeout != newTimeout) {
302            powerdownTimeout = newTimeout;
303            synchronized (this) {
304                if (powerdownTimer != null && powerdownConfirmed) {
305                    powerdownTimer
306                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
307                }
308
309            }
310        }
311    }
312
313}