001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.core.JsonProcessingException; 022import com.fasterxml.jackson.databind.node.ObjectNode; 023import java.io.IOException; 024import java.nio.file.Path; 025import java.time.Duration; 026import java.time.Instant; 027import java.util.LinkedList; 028import java.util.Queue; 029import java.util.logging.Level; 030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName; 031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities; 032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; 033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown; 034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand; 036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent; 037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; 038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; 039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; 040import org.jgrapes.core.Channel; 041import org.jgrapes.core.Components; 042import org.jgrapes.core.Components.Timer; 043import org.jgrapes.core.annotation.Handler; 044import org.jgrapes.core.events.Stop; 045import org.jgrapes.io.events.Closed; 046import org.jgrapes.io.events.ProcessExited; 047import org.jgrapes.net.SocketIOChannel; 048import org.jgrapes.util.events.ConfigurationUpdate; 049 050/** 051 * A component that handles the communication over the Qemu monitor 052 * socket. 053 * 054 * If the log level for this class is set to fine, the messages 055 * exchanged on the monitor socket are logged. 056 */ 057@SuppressWarnings("PMD.DataflowAnomalyAnalysis") 058public class QemuMonitor extends QemuConnector { 059 060 private int powerdownTimeout; 061 private final Queue<QmpCommand> executing = new LinkedList<>(); 062 private Instant powerdownStartedAt; 063 private Stop suspendedStop; 064 private Timer powerdownTimer; 065 private boolean powerdownConfirmed; 066 private boolean monitorReady; 067 068 /** 069 * Instantiates a new QEMU monitor. 070 * 071 * @param componentChannel the component channel 072 * @param configDir the config dir 073 * @throws IOException Signals that an I/O exception has occurred. 074 */ 075 @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic", 076 "PMD.ConstructorCallsOverridableMethod" }) 077 public QemuMonitor(Channel componentChannel, Path configDir) 078 throws IOException { 079 super(componentChannel); 080 attach(new RamController(channel())); 081 attach(new CpuController(channel())); 082 attach(new DisplayController(channel(), configDir)); 083 attach(new CdMediaController(channel())); 084 } 085 086 /** 087 * As the initial configuration of this component depends on the 088 * configuration of the {@link Runner}, it doesn't have a handler 089 * for the {@link ConfigurationUpdate} event. The values are 090 * forwarded from the {@link Runner} instead. 091 * 092 * @param socketPath the socket path 093 * @param powerdownTimeout 094 */ 095 /* default */ void configure(Path socketPath, int powerdownTimeout) { 096 super.configure(socketPath); 097 this.powerdownTimeout = powerdownTimeout; 098 } 099 100 /** 101 * When the socket is connected, send the capabilities command. 102 */ 103 @Override 104 protected void socketConnected() { 105 rep().fire(new MonitorCommand(new QmpCapabilities())); 106 } 107 108 @Override 109 protected void processInput(String line) 110 throws IOException { 111 logger.finer(() -> "monitor(in): " + line); 112 try { 113 var response = mapper.readValue(line, ObjectNode.class); 114 if (response.has("QMP")) { 115 monitorReady = true; 116 logger.fine(() -> "QMP connection ready"); 117 rep().fire(new MonitorReady()); 118 return; 119 } 120 if (response.has("return") || response.has("error")) { 121 QmpCommand executed = executing.poll(); 122 logger.finer( 123 () -> String.format("(Previous \"monitor(in)\" is result " 124 + "from executing %s)", executed)); 125 var monRes = MonitorResult.from(executed, response); 126 logger.fine(() -> "QMP triggers: " + monRes); 127 rep().fire(monRes); 128 return; 129 } 130 if (response.has("event")) { 131 MonitorEvent.from(response).ifPresent(me -> { 132 logger.fine(() -> "QMP triggers: " + me); 133 rep().fire(me); 134 }); 135 } 136 } catch (JsonProcessingException e) { 137 throw new IOException(e); 138 } 139 } 140 141 /** 142 * On closed. 143 * 144 * @param event the event 145 */ 146 @Handler 147 public void onClosed(Closed<?> event, SocketIOChannel channel) { 148 channel.associated(this, getClass()).ifPresent(qm -> { 149 super.onClosed(event, channel); 150 logger.fine(() -> "QMP connection closed."); 151 monitorReady = false; 152 }); 153 } 154 155 /** 156 * On monitor command. 157 * 158 * @param event the event 159 * @throws IOException 160 */ 161 @Handler 162 @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", 163 "PMD.AvoidDuplicateLiterals" }) 164 public void onMonitorCommand(MonitorCommand event) throws IOException { 165 // Check prerequisites 166 if (!monitorReady && !(event.command() instanceof QmpCapabilities)) { 167 logger.severe(() -> "Premature QMP command (not ready): " 168 + event.command()); 169 rep().fire(new Stop()); 170 return; 171 } 172 173 // Send the command 174 var command = event.command(); 175 logger.fine(() -> "QMP handles: " + event.toString()); 176 String asText; 177 try { 178 asText = command.asText(); 179 logger.finer(() -> "monitor(out): " + asText); 180 } catch (JsonProcessingException e) { 181 logger.log(Level.SEVERE, e, 182 () -> "Cannot serialize Json: " + e.getMessage()); 183 return; 184 } 185 synchronized (executing) { 186 if (writer().isPresent()) { 187 executing.add(command); 188 sendCommand(asText); 189 } 190 } 191 } 192 193 /** 194 * Shutdown the VM. 195 * 196 * @param event the event 197 */ 198 @Handler(priority = 100) 199 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 200 public void onStop(Stop event) { 201 if (!monitorReady) { 202 logger.fine(() -> "Not sending QMP powerdown command" 203 + " because QMP connection is closed"); 204 return; 205 } 206 207 // We have a connection to Qemu, attempt ACPI shutdown if time left 208 powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> { 209 var now = Instant.now(); 210 event.setAssociated(Instant.class, now); 211 return now; 212 }); 213 if (powerdownStartedAt.plusSeconds(powerdownTimeout) 214 .isBefore(Instant.now())) { 215 return; 216 } 217 event.suspendHandling(); 218 suspendedStop = event; 219 220 // Send command. If not confirmed, assume "hanging" qemu process. 221 powerdownTimer = Components.schedule(t -> { 222 logger.fine(() -> "QMP powerdown command not confirmed"); 223 synchronized (this) { 224 powerdownTimer = null; 225 if (suspendedStop != null) { 226 suspendedStop.resumeHandling(); 227 suspendedStop = null; 228 } 229 } 230 }, Duration.ofSeconds(5)); 231 logger.fine(() -> "Attempting QMP (ACPI) powerdown."); 232 rep().fire(new MonitorCommand(new QmpPowerdown())); 233 } 234 235 /** 236 * When the powerdown event is confirmed, wait for termination 237 * or timeout. Termination is detected by the qemu process exiting 238 * (see {@link #onProcessExited(ProcessExited)}). 239 * 240 * @param event the event 241 */ 242 @Handler 243 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 244 public void onPowerdownEvent(PowerdownEvent event) { 245 synchronized (this) { 246 // Cancel confirmation timeout 247 if (powerdownTimer != null) { 248 powerdownTimer.cancel(); 249 } 250 251 // (Re-)schedule timer as fallback 252 var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout); 253 logger.fine(() -> "QMP powerdown confirmed, waiting for" 254 + " termination until " + waitUntil); 255 powerdownTimer = Components.schedule(t -> { 256 logger.fine(() -> "Powerdown timeout reached."); 257 synchronized (this) { 258 powerdownTimer = null; 259 if (suspendedStop != null) { 260 suspendedStop.resumeHandling(); 261 suspendedStop = null; 262 } 263 } 264 }, waitUntil); 265 powerdownConfirmed = true; 266 } 267 } 268 269 /** 270 * On process exited. 271 * 272 * @param event the event 273 */ 274 @Handler 275 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 276 public void onProcessExited(ProcessExited event) { 277 if (!event.startedBy().associated(CommandDefinition.class) 278 .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) { 279 return; 280 } 281 synchronized (this) { 282 if (powerdownTimer != null) { 283 powerdownTimer.cancel(); 284 } 285 if (suspendedStop != null) { 286 suspendedStop.resumeHandling(); 287 suspendedStop = null; 288 } 289 } 290 } 291 292 /** 293 * On configure qemu. 294 * 295 * @param event the event 296 */ 297 @Handler 298 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 299 public void onConfigureQemu(ConfigureQemu event) { 300 int newTimeout = event.configuration().vm.powerdownTimeout; 301 if (powerdownTimeout != newTimeout) { 302 powerdownTimeout = newTimeout; 303 synchronized (this) { 304 if (powerdownTimer != null && powerdownConfirmed) { 305 powerdownTimer 306 .reschedule(powerdownStartedAt.plusSeconds(newTimeout)); 307 } 308 309 } 310 } 311 } 312 313}