001/* 002 * VM-Operator 003 * Copyright (C) 2023 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.core.JsonProcessingException; 022import com.fasterxml.jackson.databind.node.ObjectNode; 023import java.io.IOException; 024import java.nio.file.Path; 025import java.time.Duration; 026import java.time.Instant; 027import java.util.LinkedList; 028import java.util.Queue; 029import java.util.logging.Level; 030import org.jdrupes.vmoperator.runner.qemu.Constants.ProcessName; 031import org.jdrupes.vmoperator.runner.qemu.commands.QmpCapabilities; 032import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; 033import org.jdrupes.vmoperator.runner.qemu.commands.QmpPowerdown; 034import org.jdrupes.vmoperator.runner.qemu.events.ConfigureQemu; 035import org.jdrupes.vmoperator.runner.qemu.events.MonitorCommand; 036import org.jdrupes.vmoperator.runner.qemu.events.MonitorEvent; 037import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; 038import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; 039import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; 040import org.jgrapes.core.Channel; 041import org.jgrapes.core.Components; 042import org.jgrapes.core.Components.Timer; 043import org.jgrapes.core.annotation.Handler; 044import org.jgrapes.core.events.Stop; 045import org.jgrapes.io.events.Closed; 046import org.jgrapes.io.events.ProcessExited; 047import org.jgrapes.net.SocketIOChannel; 048import org.jgrapes.util.events.ConfigurationUpdate; 049 050/** 051 * A component that handles the communication over the Qemu monitor 052 * socket. 053 * 054 * If the log level for this class is set to fine, the messages 055 * exchanged on the monitor socket are logged. 056 */ 057public class QemuMonitor extends QemuConnector { 058 059 private int powerdownTimeout; 060 private final Queue<QmpCommand> executing = new LinkedList<>(); 061 private Instant powerdownStartedAt; 062 private Stop suspendedStop; 063 private Timer powerdownTimer; 064 private boolean powerdownConfirmed; 065 private boolean monitorReady; 066 067 /** 068 * Instantiates a new QEMU monitor. 069 * 070 * @param componentChannel the component channel 071 * @param configDir the config dir 072 * @throws IOException Signals that an I/O exception has occurred. 073 */ 074 public QemuMonitor(Channel componentChannel, Path configDir) 075 throws IOException { 076 super(componentChannel); 077 attach(new RamController(channel())); 078 attach(new CpuController(channel())); 079 attach(new DisplayController(channel(), configDir)); 080 attach(new CdMediaController(channel())); 081 } 082 083 /** 084 * As the initial configuration of this component depends on the 085 * configuration of the {@link Runner}, it doesn't have a handler 086 * for the {@link ConfigurationUpdate} event. The values are 087 * forwarded from the {@link Runner} instead. 088 * 089 * @param socketPath the socket path 090 * @param powerdownTimeout 091 */ 092 /* default */ void configure(Path socketPath, int powerdownTimeout) { 093 super.configure(socketPath); 094 this.powerdownTimeout = powerdownTimeout; 095 } 096 097 /** 098 * When the socket is connected, send the capabilities command. 099 */ 100 @Override 101 protected void socketConnected() { 102 rep().fire(new MonitorCommand(new QmpCapabilities())); 103 } 104 105 @Override 106 protected void processInput(String line) 107 throws IOException { 108 logger.finer(() -> "monitor(in): " + line); 109 try { 110 var response = mapper.readValue(line, ObjectNode.class); 111 if (response.has("QMP")) { 112 monitorReady = true; 113 logger.fine(() -> "QMP connection ready"); 114 rep().fire(new MonitorReady()); 115 return; 116 } 117 if (response.has("return") || response.has("error")) { 118 QmpCommand executed = executing.poll(); 119 logger.finer( 120 () -> String.format("(Previous \"monitor(in)\" is result " 121 + "from executing %s)", executed)); 122 var monRes = MonitorResult.from(executed, response); 123 logger.fine(() -> "QMP triggers: " + monRes); 124 rep().fire(monRes); 125 return; 126 } 127 if (response.has("event")) { 128 MonitorEvent.from(response).ifPresent(me -> { 129 logger.fine(() -> "QMP triggers: " + me); 130 rep().fire(me); 131 }); 132 } 133 } catch (JsonProcessingException e) { 134 throw new IOException(e); 135 } 136 } 137 138 /** 139 * On closed. 140 * 141 * @param event the event 142 */ 143 @Handler 144 public void onClosed(Closed<?> event, SocketIOChannel channel) { 145 channel.associated(this, getClass()).ifPresent(qm -> { 146 super.onClosed(event, channel); 147 logger.fine(() -> "QMP connection closed."); 148 monitorReady = false; 149 }); 150 } 151 152 /** 153 * On monitor command. 154 * 155 * @param event the event 156 * @throws IOException 157 */ 158 @Handler 159 @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", 160 "PMD.AvoidDuplicateLiterals" }) 161 public void onMonitorCommand(MonitorCommand event) throws IOException { 162 // Check prerequisites 163 if (!monitorReady && !(event.command() instanceof QmpCapabilities)) { 164 logger.severe(() -> "Premature QMP command (not ready): " 165 + event.command()); 166 rep().fire(new Stop()); 167 return; 168 } 169 170 // Send the command 171 var command = event.command(); 172 logger.fine(() -> "QMP handles: " + event.toString()); 173 String asText; 174 try { 175 asText = command.asText(); 176 logger.finer(() -> "monitor(out): " + asText); 177 } catch (JsonProcessingException e) { 178 logger.log(Level.SEVERE, e, 179 () -> "Cannot serialize Json: " + e.getMessage()); 180 return; 181 } 182 synchronized (executing) { 183 if (writer().isPresent()) { 184 executing.add(command); 185 sendCommand(asText); 186 } 187 } 188 } 189 190 /** 191 * Shutdown the VM. 192 * 193 * @param event the event 194 */ 195 @Handler(priority = 100) 196 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 197 public void onStop(Stop event) { 198 if (!monitorReady) { 199 logger.fine(() -> "Not sending QMP powerdown command" 200 + " because QMP connection is closed"); 201 return; 202 } 203 204 // We have a connection to Qemu, attempt ACPI shutdown if time left 205 powerdownStartedAt = event.associated(Instant.class).orElseGet(() -> { 206 var now = Instant.now(); 207 event.setAssociated(Instant.class, now); 208 return now; 209 }); 210 if (powerdownStartedAt.plusSeconds(powerdownTimeout) 211 .isBefore(Instant.now())) { 212 return; 213 } 214 event.suspendHandling(); 215 suspendedStop = event; 216 217 // Send command. If not confirmed, assume "hanging" qemu process. 218 powerdownTimer = Components.schedule(t -> { 219 logger.fine(() -> "QMP powerdown command not confirmed"); 220 synchronized (this) { 221 powerdownTimer = null; 222 if (suspendedStop != null) { 223 suspendedStop.resumeHandling(); 224 suspendedStop = null; 225 } 226 } 227 }, Duration.ofSeconds(5)); 228 logger.fine(() -> "Attempting QMP (ACPI) powerdown."); 229 rep().fire(new MonitorCommand(new QmpPowerdown())); 230 } 231 232 /** 233 * When the powerdown event is confirmed, wait for termination 234 * or timeout. Termination is detected by the qemu process exiting 235 * (see {@link #onProcessExited(ProcessExited)}). 236 * 237 * @param event the event 238 */ 239 @Handler 240 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 241 public void onPowerdownEvent(PowerdownEvent event) { 242 synchronized (this) { 243 // Cancel confirmation timeout 244 if (powerdownTimer != null) { 245 powerdownTimer.cancel(); 246 } 247 248 // (Re-)schedule timer as fallback 249 var waitUntil = powerdownStartedAt.plusSeconds(powerdownTimeout); 250 logger.fine(() -> "QMP powerdown confirmed, waiting for" 251 + " termination until " + waitUntil); 252 powerdownTimer = Components.schedule(t -> { 253 logger.fine(() -> "Powerdown timeout reached."); 254 synchronized (this) { 255 powerdownTimer = null; 256 if (suspendedStop != null) { 257 suspendedStop.resumeHandling(); 258 suspendedStop = null; 259 } 260 } 261 }, waitUntil); 262 powerdownConfirmed = true; 263 } 264 } 265 266 /** 267 * On process exited. 268 * 269 * @param event the event 270 */ 271 @Handler 272 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 273 public void onProcessExited(ProcessExited event) { 274 if (!event.startedBy().associated(CommandDefinition.class) 275 .map(cd -> ProcessName.QEMU.equals(cd.name())).orElse(false)) { 276 return; 277 } 278 synchronized (this) { 279 if (powerdownTimer != null) { 280 powerdownTimer.cancel(); 281 } 282 if (suspendedStop != null) { 283 suspendedStop.resumeHandling(); 284 suspendedStop = null; 285 } 286 } 287 } 288 289 /** 290 * On configure qemu. 291 * 292 * @param event the event 293 */ 294 @Handler 295 @SuppressWarnings("PMD.AvoidSynchronizedStatement") 296 public void onConfigureQemu(ConfigureQemu event) { 297 int newTimeout = event.configuration().vm.powerdownTimeout; 298 if (powerdownTimeout != newTimeout) { 299 powerdownTimeout = newTimeout; 300 synchronized (this) { 301 if (powerdownTimer != null && powerdownConfirmed) { 302 powerdownTimer 303 .reschedule(powerdownStartedAt.plusSeconds(newTimeout)); 304 } 305 306 } 307 } 308 } 309 310}