![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/mautic.corals.io/app/bundles/LeadBundle/Model/ |
<?php namespace Mautic\LeadBundle\Model; use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\ORMException; use Mautic\CoreBundle\Helper\Chart\ChartQuery; use Mautic\CoreBundle\Helper\Chart\LineChart; use Mautic\CoreBundle\Helper\CoreParametersHelper; use Mautic\CoreBundle\Helper\DateTimeHelper; use Mautic\CoreBundle\Helper\InputHelper; use Mautic\CoreBundle\Helper\PathsHelper; use Mautic\CoreBundle\Helper\UserHelper; use Mautic\CoreBundle\Model\FormModel; use Mautic\CoreBundle\Model\NotificationModel; use Mautic\CoreBundle\ProcessSignal\ProcessSignalService; use Mautic\CoreBundle\Security\Permissions\CorePermissions; use Mautic\CoreBundle\Translation\Translator; use Mautic\LeadBundle\Entity\Company; use Mautic\LeadBundle\Entity\Import; use Mautic\LeadBundle\Entity\ImportRepository; use Mautic\LeadBundle\Entity\LeadEventLog; use Mautic\LeadBundle\Entity\LeadEventLogRepository; use Mautic\LeadBundle\Event\ImportEvent; use Mautic\LeadBundle\Event\ImportProcessEvent; use Mautic\LeadBundle\Exception\ImportDelayedException; use Mautic\LeadBundle\Exception\ImportFailedException; use Mautic\LeadBundle\Helper\Progress; use Mautic\LeadBundle\LeadEvents; use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException; use Symfony\Component\Routing\Generator\UrlGeneratorInterface; use Symfony\Contracts\EventDispatcher\Event; /** * @extends FormModel<Import> */ class ImportModel extends FormModel { protected LeadEventLogRepository $leadEventLogRepo; public function __construct( protected PathsHelper $pathsHelper, protected LeadModel $leadModel, protected NotificationModel $notificationModel, protected CoreParametersHelper $config, protected CompanyModel $companyModel, EntityManagerInterface $em, CorePermissions $security, EventDispatcherInterface $dispatcher, UrlGeneratorInterface $router, Translator $translator, UserHelper $userHelper, LoggerInterface $mauticLogger, private ProcessSignalService $processSignalService ) { $this->leadEventLogRepo = $leadModel->getEventLogRepository(); parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $config); } /** * Returns the Import entity which should be processed next. * * @return Import|null */ public function getImportToProcess() { $result = $this->getRepository()->getImportsWithStatuses([Import::QUEUED, Import::DELAYED], 1); if (isset($result[0]) && $result[0] instanceof Import) { return $result[0]; } return null; } /** * Compares current number of imports in progress with the limit from the configuration. */ public function checkParallelImportLimit(): bool { $parallelImportLimit = $this->getParallelImportLimit(); $importsInProgress = $this->getRepository()->countImportsInProgress(); return !($importsInProgress >= $parallelImportLimit); } /** * Returns parallel import limit from the configuration. * * @param int $default * * @return int */ public function getParallelImportLimit($default = 1) { return $this->config->get('parallel_import_limit', $default); } /** * Generates a HTML link to the import detail. */ public function generateLink(Import $import): string { return '<a href="'.$this->router->generate( 'mautic_import_action', ['objectAction' => 'view', 'object' => 'lead', 'objectId' => $import->getId()] ).'" data-toggle="ajax">'.$import->getOriginalFile().' ('.$import->getId().')</a>'; } /** * Check if there are some IN_PROGRESS imports which got stuck for a while. * Set those as failed. */ public function setGhostImportsAsFailed() { $ghostDelay = 2; $imports = $this->getRepository()->getGhostImports($ghostDelay, 5); if (empty($imports)) { return null; } foreach ($imports as $import) { $import->setStatus($import::FAILED) ->setStatusInfo($this->translator->trans('mautic.lead.import.ghost.limit.hit', ['%limit%' => $ghostDelay])) ->removeFile(); if ($import->getCreatedBy()) { $this->notificationModel->addNotification( $this->translator->trans( 'mautic.lead.import.result.info', ['%import%' => $this->generateLink($import)] ), 'info', false, $this->translator->trans('mautic.lead.import.failed'), 'ri-download-line', null, $this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy()) ); } } $this->saveEntities($imports); } /** * Start import. This is meant for the CLI command since it will import * the whole file at once. * * @param int $limit Number of records to import before delaying the import. 0 will import all * * @throws ImportFailedException * @throws ImportDelayedException */ public function beginImport(Import $import, Progress $progress, $limit = 0): void { $this->setGhostImportsAsFailed(); if (!$import) { $msg = 'import is empty, closing the import process'; $this->logDebug($msg, $import); throw new ImportFailedException($msg); } if (!$import->canProceed()) { $this->saveEntity($import); $msg = 'import cannot be processed because '.$import->getStatusInfo(); $this->logDebug($msg, $import); throw new ImportFailedException($msg); } if (!$this->checkParallelImportLimit()) { $info = $this->translator->trans( 'mautic.lead.import.parallel.limit.hit', ['%limit%' => $this->getParallelImportLimit()] ); $import->setStatus($import::DELAYED)->setStatusInfo($info); $this->saveEntity($import); $msg = 'import is delayed because parrallel limit was hit. '.$import->getStatusInfo(); $this->logDebug($msg, $import); throw new ImportDelayedException($msg); } $processed = $import->getProcessedRows(); $total = $import->getLineCount(); $pending = $total - $processed; if ($limit && $limit < $pending) { $processed = 0; $total = $limit; } $progress->setTotal($total); $progress->setDone($processed); $import->start(); // Save the start changes so the user could see it $this->saveEntity($import); $this->logDebug('The background import is about to start', $import); try { if (!$this->process($import, $progress, $limit)) { throw new ImportFailedException($import->getStatusInfo()); } } catch (ORMException $e) { // The EntityManager is probably closed. The entity cannot be saved. $info = $this->translator->trans( 'mautic.lead.import.database.exception', ['%message%' => $e->getMessage()] ); $import->setStatus($import::DELAYED)->setStatusInfo($info); throw new ImportFailedException('Database had been overloaded'); } $import->end(); $this->logDebug('The background import has ended', $import); // Save the end changes so the user could see it $this->saveEntity($import); if ($import->getCreatedBy()) { $this->notificationModel->addNotification( $this->translator->trans( 'mautic.lead.import.result.info', ['%import%' => $this->generateLink($import)] ), 'info', false, $this->translator->trans('mautic.lead.import.completed'), 'ri-download-line', null, $this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy()) ); } } /** * Import the CSV file from configuration in the $import entity. * * @param int $limit Number of records to import before delaying the import */ public function process(Import $import, Progress $progress, $limit = 0): bool { try { $file = new \SplFileObject($import->getFilePath()); } catch (\Exception $e) { $import->setStatusInfo('SplFileObject cannot read the file. '.$e->getMessage()); $import->setStatus(Import::FAILED); $this->logDebug('import cannot be processed because '.$import->getStatusInfo(), $import); return false; } $lastImportedLine = $import->getLastLineImported(); $headers = $import->getHeaders(); $headerCount = count($headers); $config = $import->getParserConfig(); $counter = 0; $file->seek($lastImportedLine); $lineNumber = $lastImportedLine + 1; $this->logDebug('The import is starting on line '.$lineNumber, $import); $batchSize = $config['batchlimit']; // Convert to field names array_walk($headers, function (&$val): void { $val = strtolower(InputHelper::alphanum($val, false, '_')); }); while ($batchSize && !$file->eof()) { $string = $file->current(); $file->next(); $data = str_getcsv($string, $config['delimiter'], $config['enclosure'], $config['escape']); $import->setLastLineImported($lineNumber); // Ignore the header row if (1 === $lineNumber) { ++$lineNumber; continue; } // Ensure the progress is changing ++$lineNumber; --$batchSize; $progress->increase(); $errorMessage = null; $eventLog = $this->initEventLog($import, $lineNumber); if ($this->isEmptyCsvRow($data)) { $errorMessage = 'mautic.lead.import.error.line_empty'; } if ($this->hasMoreValuesThanColumns($data, $headerCount)) { $errorMessage = 'mautic.lead.import.error.header_mismatch'; } if (!$errorMessage) { $data = $this->trimArrayValues($data); if (!array_filter($data)) { continue; } $data = array_combine($headers, $data); try { $event = new ImportProcessEvent($import, $eventLog, $data); $this->dispatcher->dispatch($event, LeadEvents::IMPORT_ON_PROCESS); if ($event->wasMerged()) { $this->logDebug('Entity on line '.$lineNumber.' has been updated', $import); $import->increaseUpdatedCount(); } else { $this->logDebug('Entity on line '.$lineNumber.' has been created', $import); $import->increaseInsertedCount(); } } catch (\Exception $e) { // Email validation likely failed $errorMessage = $e->getMessage(); } } if ($errorMessage) { // Log the error first $import->increaseIgnoredCount(); $this->logDebug('Line '.$lineNumber.' error: '.$errorMessage, $import); if (!$this->em->isOpen()) { // Something bad must have happened if the entity manager is closed. // We will not be able to save any entities. throw new ORMException($errorMessage); } // This should be called only if the entity manager is open $this->logImportRowError($eventLog, $errorMessage); } else { $this->leadEventLogRepo->saveEntity($eventLog); } // Release entities in Doctrine's memory to prevent memory leak $this->em->detach($eventLog); if (null !== $leadEntity = $eventLog->getLead()) { $this->em->detach($leadEntity); $company = $leadEntity->getCompany(); $primaryCompany = $leadEntity->getPrimaryCompany(); if ($company instanceof Company) { $this->em->detach($company); } if ($primaryCompany instanceof Company) { $this->em->detach($primaryCompany); } } $eventLog = null; $data = null; // Save Import entity once per batch so the user could see the progress if (0 === $batchSize && $import->isBackgroundProcess()) { $isPublished = $this->getRepository()->getValue($import->getId(), 'is_published'); if (!$isPublished) { $import->setStatus($import::STOPPED); } $this->saveEntity($import); $this->dispatchEvent('batch_processed', $import); // Stop the import loop if the import got unpublished if (!$isPublished) { $this->logDebug('The import has been unpublished. Stopping the import now.', $import); break; } $batchSize = $config['batchlimit']; } if ($this->processSignalService->isSignalCaught()) { break; } ++$counter; if ($limit && $counter >= $limit) { break; } } if ($import->getLastLineImported() < $import->getLineCount()) { $import->setStatus($import::DELAYED); $this->saveEntity($import); } // Close the file $file = null; return true; } /** * Check if the CSV row has more values than the CSV header has columns. * If it is less, generate empty values for the rest of the missing values. * If it is more, return true. * * @param int $headerCount */ public function hasMoreValuesThanColumns(array &$data, $headerCount): bool { $dataCount = count($data); if ($headerCount !== $dataCount) { $diffCount = ($headerCount - $dataCount); if ($diffCount > 0) { // Fill in the data with empty string $fill = array_fill($dataCount, $diffCount, ''); $data = $data + $fill; } else { return true; } } return false; } /** * Trim all values in a one dymensional array. */ public function trimArrayValues(array $data): array { return array_map('trim', $data); } /** * Decide whether the CSV row is empty. * * @param mixed $row */ public function isEmptyCsvRow($row): bool { if (!is_array($row) || empty($row)) { return true; } if (1 === count($row) && ('' === $row[0] || null === $row[0])) { return true; } return !array_filter($row); } /** * Save log about errored line. * * @param string $errorMessage */ public function logImportRowError(LeadEventLog $eventLog, $errorMessage): void { $eventLog->addProperty('error', $this->translator->trans($errorMessage)) ->setAction('failed'); $this->leadEventLogRepo->saveEntity($eventLog); } /** * Initialize LeadEventLog object and configure it as the import event. * * @param int $lineNumber */ public function initEventLog(Import $import, $lineNumber): LeadEventLog { $eventLog = new LeadEventLog(); $eventLog->setUserId($import->getCreatedBy()) ->setUserName($import->getCreatedByUser()) ->setBundle($import->getObject()) ->setObject('import') ->setObjectId($import->getId()) ->setProperties( [ 'line' => $lineNumber, 'file' => $import->getOriginalFile(), ] ); return $eventLog; } /** * Get line chart data of imported rows. * * @param string $unit {@link php.net/manual/en/function.date.php#refsect1-function.date-parameters} * @param string $dateFormat * @param array $filter */ public function getImportedRowsLineChartData($unit, \DateTimeInterface $dateFrom, \DateTimeInterface $dateTo, $dateFormat = null, $filter = []): array { $filter['object'] = 'import'; $filter['bundle'] = 'lead'; // Clear the times for display by minutes /** @var \DateTime $dateFrom */ /** @var \DateTime $dateTo */ $dateFrom->modify('-1 minute'); $dateFrom->setTime($dateFrom->format('H'), $dateFrom->format('i'), 0); $dateTo->modify('+1 minute'); $dateTo->setTime($dateTo->format('H'), $dateTo->format('i'), 0); $query = new ChartQuery($this->em->getConnection(), $dateFrom, $dateTo, $unit); $chart = new LineChart($unit, $dateFrom, $dateTo, $dateFormat); $data = $query->fetchTimeData('lead_event_log', 'date_added', $filter); $chart->setDataset($this->translator->trans('mautic.lead.import.processed.rows'), $data); return $chart->render(); } /** * Returns a list of failed rows for the import. * * @param int $importId * @param string $object * * @return array|null */ public function getFailedRows($importId = null, $object = 'lead') { if (!$importId) { return null; } return $this->getEventLogRepository()->getFailedRows($importId, ['select' => 'properties,id'], $object); } /** * @return ImportRepository */ public function getRepository() { return $this->em->getRepository(Import::class); } /** * @return LeadEventLogRepository */ public function getEventLogRepository() { return $this->em->getRepository(LeadEventLog::class); } public function getPermissionBase(): string { return 'lead:imports'; } /** * Returns a unique name of a CSV file based on time. */ public function getUniqueFileName(): string { return (new DateTimeHelper())->toUtcString('YmdHis').'.csv'; } /** * Returns a full path to the import dir. */ public function getImportDir(): string { return $this->pathsHelper->getImportLeadsPath(); } /** * Get a specific entity or generate a new one if id is empty. */ public function getEntity($id = null): ?Import { if (null === $id) { return new Import(); } return parent::getEntity($id); } /** * @throws MethodNotAllowedHttpException */ protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null): ?Event { if (!$entity instanceof Import) { throw new MethodNotAllowedHttpException(['Import']); } switch ($action) { case 'pre_save': $name = LeadEvents::IMPORT_PRE_SAVE; break; case 'post_save': $name = LeadEvents::IMPORT_POST_SAVE; break; case 'pre_delete': $name = LeadEvents::IMPORT_PRE_DELETE; break; case 'post_delete': $name = LeadEvents::IMPORT_POST_DELETE; break; case 'batch_processed': $name = LeadEvents::IMPORT_BATCH_PROCESSED; break; default: return null; } if ($this->dispatcher->hasListeners($name)) { if (empty($event)) { $event = new ImportEvent($entity, $isNew); $event->setEntityManager($this->em); } $this->dispatcher->dispatch($event, $name); return $event; } else { return null; } } /** * Logs a debug message if in dev environment. * * @param string $msg */ protected function logDebug($msg, Import $import = null) { if (MAUTIC_ENV === 'dev') { $importId = $import ? '('.$import->getId().')' : ''; $this->logger->debug(sprintf('IMPORT%s: %s', $importId, $msg)); } } }