001/*
002 * VM-Operator
003 * Copyright (C) 2023 Michael N. Lipp
004 * 
005 * This program is free software: you can redistribute it and/or modify
006 * it under the terms of the GNU Affero General Public License as
007 * published by the Free Software Foundation, either version 3 of the
008 * License, or (at your option) any later version.
009 *
010 * This program is distributed in the hope that it will be useful,
011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
013 * GNU Affero General Public License for more details.
014 *
015 * You should have received a copy of the GNU Affero General Public License
016 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
017 */
018
019package org.jdrupes.vmoperator.runner.qemu;
020
021import com.fasterxml.jackson.core.JsonProcessingException;
022import com.fasterxml.jackson.databind.node.ObjectNode;
023import java.io.IOException;
024import java.nio.file.Path;
025import java.time.Duration;
026import java.time.Instant;
027import java.util.LinkedList;
028import java.util.Queue;
029import java.util.logging.Level;
030import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities;
031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
032import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown;
033import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu;
034import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand;
035import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent;
036import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
037import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
038import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
039import org.jgrapes.core.Channel;
040import org.jgrapes.core.Components;
041import org.jgrapes.core.Components.Timer;
042import org.jgrapes.core.annotation.Handler;
043import org.jgrapes.core.events.Stop;
044import org.jgrapes.io.events.Closed;
045import org.jgrapes.net.SocketIOChannel;
046import org.jgrapes.util.events.ConfigurationUpdate;
047
048/**
049 * A component that handles the communication over the Qemu monitor
050 * socket.
051 * 
052 * If the log level for this class is set to fine, the messages 
053 * exchanged on the monitor socket are logged.
054 */
055@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
056public class QemuMonitor extends QemuConnector {
057
058    private int powerdownTimeout;
059    private final Queue<QmpCommand> executing = new LinkedList<>();
060    private Instant powerdownStartedAt;
061    private Stop suspendedStop;
062    private Timer powerdownTimer;
063    private boolean powerdownConfirmed;
064
065    /**
066     * Instantiates a new QEMU monitor.
067     *
068     * @param componentChannel the component channel
069     * @param configDir the config dir
070     * @throws IOException Signals that an I/O exception has occurred.
071     */
072    @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
073        "PMD.ConstructorCallsOverridableMethod" })
074    public QemuMonitor(Channel componentChannel, Path configDir)
075            throws IOException {
076        super(componentChannel);
077        attach(new RamController(channel()));
078        attach(new CpuController(channel()));
079        attach(new DisplayController(channel(), configDir));
080        attach(new CdMediaController(channel()));
081    }
082
083    /**
084     * As the initial configuration of this component depends on the 
085     * configuration of the {@link Runner}, it doesn't have a handler 
086     * for the {@link ConfigurationUpdate} event. The values are 
087     * forwarded from the {@link Runner} instead.
088     *
089     * @param socketPath the socket path
090     * @param powerdownTimeout 
091     */
092    /* default */ void configure(Path socketPath, int powerdownTimeout) {
093        super.configure(socketPath);
094        this.powerdownTimeout = powerdownTimeout;
095    }
096
097    /**
098     * When the socket is connected, send the capabilities command.
099     */
100    @Override
101    protected void socketConnected() {
102        fire(new MonitorCommand(new QmpCapabilities()));
103    }
104
105    @Override
106    protected void processInput(String line)
107            throws IOException {
108        logger.fine(() -> "monitor(in): " + line);
109        try {
110            var response = mapper.readValue(line, ObjectNode.class);
111            if (response.has("QMP")) {
112                rep().fire(new MonitorReady());
113                return;
114            }
115            if (response.has("return") || response.has("error")) {
116                QmpCommand executed = executing.poll();
117                logger.fine(
118                    () -> String.format("(Previous \"monitor(in)\" is result "
119                        + "from executing %s)", executed));
120                rep().fire(MonitorResult.from(executed, response));
121                return;
122            }
123            if (response.has("event")) {
124                MonitorEvent.from(response).ifPresent(rep()::fire);
125            }
126        } catch (JsonProcessingException e) {
127            throw new IOException(e);
128        }
129    }
130
131    /**
132     * On closed.
133     *
134     * @param event the event
135     */
136    @Handler
137    @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
138        "PMD.AvoidDuplicateLiterals" })
139    public void onClosed(Closed<?> event, SocketIOChannel channel) {
140        super.onClosed(event, channel);
141        channel.associated(QemuMonitor.class).ifPresent(qm -> {
142            synchronized (this) {
143                if (powerdownTimer != null) {
144                    powerdownTimer.cancel();
145                }
146                if (suspendedStop != null) {
147                    suspendedStop.resumeHandling();
148                    suspendedStop = null;
149                }
150            }
151        });
152    }
153
154    /**
155     * On monitor command.
156     *
157     * @param event the event
158     * @throws IOException 
159     */
160    @Handler
161    @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
162        "PMD.AvoidSynchronizedStatement" })
163    public void onExecQmpCommand(MonitorCommand event) throws IOException {
164        var command = event.command();
165        logger.fine(() -> "monitor(out): " + command.toString());
166        String asText;
167        try {
168            asText = command.asText();
169        } catch (JsonProcessingException e) {
170            logger.log(Level.SEVERE, e,
171                () -> "Cannot serialize Json: " + e.getMessage());
172            return;
173        }
174        synchronized (executing) {
175            if (writer().isPresent()) {
176                executing.add(command);
177                sendCommand(asText);
178            }
179        }
180    }
181
182    /**
183     * Shutdown the VM.
184     *
185     * @param event the event
186     */
187    @Handler(priority = 100)
188    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
189    public void onStop(Stop event) {
190        if (qemuChannel() != null) {
191            // We have a connection to Qemu, attempt ACPI shutdown.
192            event.suspendHandling();
193            suspendedStop = event;
194
195            // Attempt powerdown command. If not confirmed, assume
196            // "hanging" qemu process.
197            powerdownTimer = Components.schedule(t -> {
198                // Powerdown not confirmed
199                logger.fine(() -> "QMP powerdown command has not effect.");
200                synchronized (this) {
201                    powerdownTimer = null;
202                    if (suspendedStop != null) {
203                        suspendedStop.resumeHandling();
204                        suspendedStop = null;
205                    }
206                }
207            }, Duration.ofSeconds(1));
208            logger.fine(() -> "Attempting QMP powerdown.");
209            powerdownStartedAt = Instant.now();
210            fire(new MonitorCommand(new QmpPowerdown()));
211        }
212    }
213
214    /**
215     * On powerdown event.
216     *
217     * @param event the event
218     */
219    @Handler
220    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
221    public void onPowerdownEvent(PowerdownEvent event) {
222        synchronized (this) {
223            // Cancel confirmation timeout
224            if (powerdownTimer != null) {
225                powerdownTimer.cancel();
226            }
227
228            // (Re-)schedule timer as fallback
229            logger.fine(() -> "QMP powerdown confirmed, waiting...");
230            powerdownTimer = Components.schedule(t -> {
231                logger.fine(() -> "Powerdown timeout reached.");
232                synchronized (this) {
233                    if (suspendedStop != null) {
234                        suspendedStop.resumeHandling();
235                        suspendedStop = null;
236                    }
237                }
238            }, powerdownStartedAt.plusSeconds(powerdownTimeout));
239            powerdownConfirmed = true;
240        }
241    }
242
243    /**
244     * On configure qemu.
245     *
246     * @param event the event
247     */
248    @Handler
249    @SuppressWarnings("PMD.AvoidSynchronizedStatement")
250    public void onConfigureQemu(ConfigureQemu event) {
251        int newTimeout = event.configuration().vm.powerdownTimeout;
252        if (powerdownTimeout != newTimeout) {
253            powerdownTimeout = newTimeout;
254            synchronized (this) {
255                if (powerdownTimer != null && powerdownConfirmed) {
256                    powerdownTimer
257                        .reschedule(powerdownStartedAt.plusSeconds(newTimeout));
258                }
259
260            }
261        }
262    }
263
264}