001/* 002 * VM-Operator 003 * Copyright (C) 2025 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.runner.qemu; 020 021import com.fasterxml.jackson.databind.ObjectMapper; 022import java.io.IOException; 023import java.io.Writer; 024import java.lang.reflect.UndeclaredThrowableException; 025import java.net.UnixDomainSocketAddress; 026import java.nio.file.Files; 027import java.nio.file.Path; 028import java.util.Optional; 029import org.jgrapes.core.Channel; 030import org.jgrapes.core.Component; 031import org.jgrapes.core.EventPipeline; 032import org.jgrapes.core.annotation.Handler; 033import org.jgrapes.core.events.Start; 034import org.jgrapes.core.events.Stop; 035import org.jgrapes.io.events.Closed; 036import org.jgrapes.io.events.ConnectError; 037import org.jgrapes.io.events.Input; 038import org.jgrapes.io.events.OpenSocketConnection; 039import org.jgrapes.io.util.ByteBufferWriter; 040import org.jgrapes.io.util.LineCollector; 041import org.jgrapes.net.SocketIOChannel; 042import org.jgrapes.net.events.ClientConnected; 043import org.jgrapes.util.events.ConfigurationUpdate; 044import org.jgrapes.util.events.FileChanged; 045import org.jgrapes.util.events.WatchFile; 046 047/** 048 * A component that handles the communication with QEMU over a socket. 049 * 050 * Derived classes should log the messages exchanged on the socket 051 * if the log level is set to fine. 052 */ 053public abstract class QemuConnector extends Component { 054 055 @SuppressWarnings("PMD.FieldNamingConventions") 056 protected static final ObjectMapper mapper = new ObjectMapper(); 057 058 private EventPipeline rep; 059 private Path socketPath; 060 private SocketIOChannel qemuChannel; 061 062 /** 063 * Instantiates a new QEMU connector. 064 * 065 * @param componentChannel the component channel 066 * @throws IOException Signals that an I/O exception has occurred. 067 */ 068 public QemuConnector(Channel componentChannel) throws IOException { 069 super(componentChannel); 070 } 071 072 /** 073 * As the initial configuration of this component depends on the 074 * configuration of the {@link Runner}, it doesn't have a handler 075 * for the {@link ConfigurationUpdate} event. The values are 076 * forwarded from the {@link Runner} instead. 077 * 078 * @param socketPath the socket path 079 */ 080 /* default */ void configure(Path socketPath) { 081 this.socketPath = socketPath; 082 logger.fine(() -> getClass().getSimpleName() 083 + " configured with socketPath=" + socketPath); 084 } 085 086 /** 087 * Note the runner's event processor and delete the socket. 088 * 089 * @param event the event 090 * @throws IOException Signals that an I/O exception has occurred. 091 */ 092 @Handler 093 public void onStart(Start event) throws IOException { 094 rep = event.associated(EventPipeline.class).get(); 095 if (socketPath == null) { 096 return; 097 } 098 Files.deleteIfExists(socketPath); 099 fire(new WatchFile(socketPath)); 100 } 101 102 /** 103 * Return the runner's event pipeline. 104 * 105 * @return the event pipeline 106 */ 107 protected EventPipeline rep() { 108 return rep; 109 } 110 111 /** 112 * Watch for the creation of the swtpm socket and start the 113 * qemu process if it has been created. 114 * 115 * @param event the event 116 */ 117 @Handler 118 public void onFileChanged(FileChanged event) { 119 if (event.change() == FileChanged.Kind.CREATED 120 && event.path().equals(socketPath)) { 121 // qemu running, open socket 122 fire(new OpenSocketConnection( 123 UnixDomainSocketAddress.of(socketPath)) 124 .setAssociated(getClass(), this)); 125 } 126 } 127 128 /** 129 * Check if this is from opening the agent socket and if true, 130 * save the socket in the context and associate the channel with 131 * the context. 132 * 133 * @param event the event 134 * @param channel the channel 135 */ 136 @SuppressWarnings("resource") 137 @Handler 138 public void onClientConnected(ClientConnected event, 139 SocketIOChannel channel) { 140 event.openEvent().associated(getClass()).ifPresent(qm -> { 141 qemuChannel = channel; 142 channel.setAssociated(getClass(), this); 143 channel.setAssociated(Writer.class, new ByteBufferWriter( 144 channel).nativeCharset()); 145 channel.setAssociated(LineCollector.class, 146 new LineCollector() 147 .consumer(line -> { 148 try { 149 processInput(line); 150 } catch (IOException e) { 151 throw new UndeclaredThrowableException(e); 152 } 153 })); 154 socketConnected(); 155 }); 156 } 157 158 /** 159 * Return the QEMU channel if the connection has been established. 160 * 161 * @return the socket IO channel 162 */ 163 protected Optional<SocketIOChannel> qemuChannel() { 164 return Optional.ofNullable(qemuChannel); 165 } 166 167 /** 168 * Return the {@link Writer} for the connection if the connection 169 * has been established. 170 * 171 * @return the optional 172 */ 173 protected Optional<Writer> writer() { 174 return qemuChannel().flatMap(c -> c.associated(Writer.class)); 175 } 176 177 /** 178 * Send the given command to QEMU. A newline is appended to the 179 * command automatically. 180 * 181 * @param command the command 182 * @return true, if successful 183 * @throws IOException Signals that an I/O exception has occurred. 184 */ 185 protected boolean sendCommand(String command) throws IOException { 186 if (writer().isEmpty()) { 187 return false; 188 } 189 writer().get().append(command).append('\n').flush(); 190 return true; 191 } 192 193 /** 194 * Called when the connector has been connected to the socket. 195 */ 196 @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") 197 protected void socketConnected() { 198 // Default is to do nothing. 199 } 200 201 /** 202 * Called when a connection attempt fails. 203 * 204 * @param event the event 205 * @param channel the channel 206 */ 207 @Handler 208 public void onConnectError(ConnectError event, SocketIOChannel channel) { 209 event.event().associated(getClass()).ifPresent(qm -> { 210 rep.fire(new Stop()); 211 }); 212 } 213 214 /** 215 * Handle data from the socket connection. 216 * 217 * @param event the event 218 * @param channel the channel 219 */ 220 @Handler 221 public void onInput(Input<?> event, SocketIOChannel channel) { 222 if (channel.associated(getClass()).isEmpty()) { 223 return; 224 } 225 channel.associated(LineCollector.class).ifPresent(collector -> { 226 collector.feed(event); 227 }); 228 } 229 230 /** 231 * Process agent input. 232 * 233 * @param line the line 234 * @throws IOException Signals that an I/O exception has occurred. 235 */ 236 protected abstract void processInput(String line) throws IOException; 237 238 /** 239 * On closed. 240 * 241 * @param event the event 242 * @param channel the channel 243 */ 244 @Handler 245 public void onClosed(Closed<?> event, SocketIOChannel channel) { 246 channel.associated(getClass()).ifPresent(qm -> { 247 qemuChannel = null; 248 }); 249 } 250}