001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.node.ObjectNode;
023import java.io.IOException;
024import java.nio.file.Path;
025import java.time.Duration;
026import java.time.Instant;
027import java.util.LinkedList;
028import java.util.Queue;
029import java.util.logging.Level;
030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName;
031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
040import org.jgrapes.core.Channel;
041import org.jgrapes.core.Components;
042import org.jgrapes.core.Components.Timer;
043import org.jgrapes.core.annotation.Handler;
044import org.jgrapes.core.events.Stop;
045import org.jgrapes.io.events.Closed;
046import org.jgrapes.io.events.ProcessExited;
047import org.jgrapes.net.SocketIOChannel;
048import org.jgrapes.util.events.ConfigurationUpdate;
049
050/**
051 * A component that handles the communication over the Qemu monitor
052 * socket.
053 * 
054 * If the log level for this class is set to fine, the messages 
055 * exchanged on the monitor socket are logged.
056 */
057@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
058public class QemuMonitor extends QemuConnector {
059
060    private int powerdownTimeout;
061    private final Queue<QmpCommand> executing = new LinkedList<>();
062    private Instant powerdownStartedAt;
063    private Stop suspendedStop;
064    private Timer powerdownTimer;
065    private boolean powerdownConfirmed;
066    private boolean monitorReady;
067
068    /**
069     * Instantiates a new QEMU monitor.
070     *
071     * @param componentChannel the component channel
072     * @param configDir the config dir
073     * @throws IOException Signals that an I/O exception has occurred.
074     */
075    @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
076        "PMD.ConstructorCallsOverridableMethod" })
077    public QemuMonitor(Channel componentChannel, Path configDir)
078            throws IOException {
079        super(componentChannel);
080        attach(new RamController(channel()));
081        attach(new CpuController(channel()));
082        attach(new DisplayController(channel(), configDir));
083        attach(new CdMediaController(channel()));
084    }
085
086    /**
087     * As the initial configuration of this component depends on the 
088     * configuration of the {@link Runner}, it doesn't have a handler 
089     * for the {@link ConfigurationUpdate} event. The values are 
090     * forwarded from the {@link Runner} instead.
091     *
092     * @param socketPath the socket path
093     * @param powerdownTimeout 
094     */
095    /* default */ void configure(Path socketPath, int powerdownTimeout) {
096        super.configure(socketPath);
097        this.powerdownTimeout = powerdownTimeout;
098    }
099
100    /**
101     * When the socket is connected, send the capabilities command.
102     */
103    @Override
104    protected void socketConnected() {
105        rep().fire(new MonitorCommand(new QmpCapabilities()));
106    }
107
108    @Override
109    protected void processInput(String line)
110            throws IOException {
111        logger.fine(() -> "monitor(in): " + line);
112        try {
113            var response = mapper.readValue(line, ObjectNode.class);
114            if (response.has("QMP")) {
115                monitorReady = true;
116                rep().fire(new MonitorReady());
117                return;
118            }
119            if (response.has("return") || response.has("error")) {
120                QmpCommand executed = executing.poll();
121                logger.fine(
122                    () -> String.format("(Previous \"monitor(in)\" is result "
123                        + "from executing %s)", executed));
124                rep().fire(MonitorResult.from(executed, response));
125                return;
126            }
127            if (response.has("event")) {
128                MonitorEvent.from(response).ifPresent(rep()::fire);
129            }
130        } catch (JsonProcessingException e) {
131            throw new IOException(e);
132        }
133    }
134
135    /**
136     * On closed.
137     *
138     * @param event the event
139     */
140    @Handler
141    public void onClosed(Closed<?> event, SocketIOChannel channel) {
142        channel.associated(this, getClass()).ifPresent(qm -> {
143            super.onClosed(event, channel);
144            logger.finer(() -> "QMP socket closed.");
145            monitorReady = false;
146        });
147    }
148
149    /**
150     * On monitor command.
151     *
152     * @param event the event
153     * @throws IOException 
154     */
155    @Handler
156    @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
157        "PMD.AvoidDuplicateLiterals" })
158    public void onMonitorCommand(MonitorCommand event) throws IOException {
159        // Check prerequisites
160        if (!monitorReady && !(event.command() instanceof QmpCapabilities)) {
161            logger.severe(() -> "Premature monitor command (not ready): "
162                + event.command());
163            rep().fire(new Stop());
164            return;
165        }
166
167        // Send the command
168        var command = event.command();
169        logger.fine(() -> "monitor(out): " + command.toString());
170        String asText;
171        try {
172            asText = command.asText();
173        } catch (JsonProcessingException e) {
174            logger.log(Level.SEVERE, e,
175                () -> "Cannot serialize Json: " + e.getMessage());
176            return;
177        }
178        synchronized (executing) {
179            if (writer().isPresent()) {
180                executing.add(command);
181                sendCommand(asText);
182            }
183        }
184    }
185
186    /**
187     * Shutdown the VM.
188     *
189     * @param event the event
190     */
191    @Handler(priority = 100)
192    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
193    public void onStop(Stop event) {
194        if (!monitorReady) {
195            logger.fine(() -> "No QMP connection,"
196                + " cannot send powerdown command");
197            return;
198        }
199
200        // We have a connection to Qemu, attempt ACPI shutdown if time left
201        powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> {
202            var now = Instant.now();
203            event.setAssociated(Instant.class, now);
204            return now;
205        });
206        if (powerdownStartedAt.plusSeconds(powerdownTimeout)
207            .isBefore(Instant.now())) {
208            return;
209        }
210        event.suspendHandling();
211        suspendedStop = event;
212
213        // Send command. If not confirmed, assume "hanging" qemu process.
214        powerdownTimer = Components.schedule(t -> {
215            logger.fine(() -> "QMP powerdown command not confirmed");
216            synchronized (this) {
217                powerdownTimer = null;
218                if (suspendedStop != null) {
219                    suspendedStop.resumeHandling();
220                    suspendedStop = null;
221                }
222            }
223        }, Duration.ofSeconds(5));
224        logger.fine(() -> "Attempting QMP (ACPI) powerdown.");
225        rep().fire(new MonitorCommand(new QmpPowerdown()));
226    }
227
228    /**
229     * When the powerdown event is confirmed, wait for termination
230     * or timeout. Termination is detected by the qemu process exiting
231     * (see {@link #onProcessExited(ProcessExited)}).
232     *
233     * @param event the event
234     */
235    @Handler
236    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
237    public void onPowerdownEvent(PowerdownEvent event) {
238        synchronized (this) {
239            // Cancel confirmation timeout
240            if (powerdownTimer != null) {
241                powerdownTimer.cancel();
242            }
243
244            // (Re-)schedule timer as fallback
245            var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout);
246            logger.fine(() -> "QMP powerdown confirmed, waiting for"
247                + " termination until " + waitUntil);
248            powerdownTimer = Components.schedule(t -> {
249                logger.fine(() -> "Powerdown timeout reached.");
250                synchronized (this) {
251                    powerdownTimer = null;
252                    if (suspendedStop != null) {
253                        suspendedStop.resumeHandling();
254                        suspendedStop = null;
255                    }
256                }
257            }, waitUntil);
258            powerdownConfirmed = true;
259        }
260    }
261
262    /**
263     * On process exited.
264     *
265     * @param event the event
266     */
267    @Handler
268    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
269    public void onProcessExited(ProcessExited event) {
270        if (!event.startedBy().associated(CommandDefinition.class)
271            .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) {
272            return;
273        }
274        synchronized (this) {
275            if (powerdownTimer != null) {
276                powerdownTimer.cancel();
277            }
278            if (suspendedStop != null) {
279                suspendedStop.resumeHandling();
280                suspendedStop = null;
281            }
282        }
283    }
284
285    /**
286     * On configure qemu.
287     *
288     * @param event the event
289     */
290    @Handler
291    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
292    public void onConfigureQemu(ConfigureQemu event) {
293        int newTimeout = event.configuration().vm.powerdownTimeout;
294        if (powerdownTimeout != newTimeout) {
295            powerdownTimeout = newTimeout;
296            synchronized (this) {
297                if (powerdownTimer != null && powerdownConfirmed) {
298                    powerdownTimer
299                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
300                }
301
302            }
303        }
304    }
305
306}